您可能已经听说过最近发布的 Flama 1.7,它带来了一些令人兴奋的新功能,可以帮助您开发和生产 ML API。这篇文章专门讨论该版本的主要亮点之一:对领域驱动设计的支持。但是,在我们通过实际示例深入了解细节之前,我们建议您记住以下资源(如果您还没有熟悉它们,请先熟悉一下):
现在,让我们开始使用新功能,看看如何利用它来构建强大且可维护的 ML API。
这篇文章的结构如下:
在现代软件开发中,将业务逻辑与应用程序的技术设计保持一致至关重要。这就是领域驱动设计 (DDD) 的闪光点。 DDD 强调构建反映业务核心领域的软件,通过围绕业务概念组织代码来分解复杂的问题。通过这样做,DDD 帮助开发人员创建可维护、可扩展且健壮的应用程序。下面我们将介绍您应该了解的 DDD 中最重要的概念。在我们深入探讨它们之前,我们需要指出的是,这篇文章并不是要成为 DDD 的综合指南,也不是该主题的主要参考文献的替代品。事实上,我们推荐以下资源来更深入地了解 DDD:
在深入研究 DDD 的任何关键概念之前,我们建议您看一下 Cosmic Python 的一个非常有用的图,其中这些图显示在应用程序的上下文中,从而显示它们是如何互连的:图.
域模型的概念可以通过其术语的简单定义来解释:
因此,域模型是一种奇特(但标准且有用)的方式来引用企业主心中关于业务如何运作的一组概念和规则。这也是我们通常所说的应用程序的业务逻辑,包括控制系统行为的规则、约束和关系。
从现在开始,我们将把域模型称为模型。
存储库模式是一种设计模式,允许将模型与数据访问解耦。存储库模式背后的主要思想是在应用程序的数据访问逻辑和业务逻辑之间创建一个抽象层。这个抽象层允许关注点分离,使代码更易于维护和测试。
在实现存储库模式时,我们通常定义一个接口来指定任何其他存储库必须实现的标准方法(AbstractRepository)。然后,使用这些方法的具体实现来定义一个特定的存储库,其中实现了数据访问逻辑(例如,SQLAlchemyRepository)。这种设计模式旨在隔离数据操作方法,以便它们可以在应用程序的其他地方无缝使用,例如在我们的域模型中。
工作单元模式是最终将模型与数据访问分离的缺失部分。工作单元封装了数据访问逻辑,并提供了一种将必须在单个事务中对数据源执行的所有操作进行分组的方法。此模式确保所有操作都以原子方式执行。
在实现工作单元模式时,我们通常定义一个接口来指定任何其他工作单元必须实现的标准方法(AbstractUnitOfWork)。然后,使用这些方法的具体实现来定义特定的工作单元,其中实现了数据访问逻辑(例如,SQLAlchemyUnitOfWork)。这种设计允许系统地处理与数据源的连接,而不需要更改应用程序业务逻辑的实现。
在快速介绍了 DDD 的主要概念之后,我们准备好深入研究使用 Flama 实现 DDD。在本节中,我们将指导您完成设置开发环境、构建基础应用程序以及使用 Flama 实现 DDD 概念的过程。
在继续示例之前,请先看一下 Flama 关于我们刚刚回顾的主要 DDD 概念的命名约定:
如上图所示,命名约定非常直观:Repository指的是存储库模式;并且,Worker是指工作单元。现在,我们可以继续使用 DDD 实现 Flama API。但是,在我们开始之前,如果您需要回顾如何使用 flama 创建简单 API 的基础知识,或者在代码准备就绪后如何运行 API,那么您可能需要检查出快速入门指南。在那里,您将找到完成本文所需的基本概念和步骤。现在,事不宜迟,让我们开始实施吧。
我们的第一步是创建开发环境,并安装该项目所需的所有依赖项。好处是,对于这个示例,我们只需要安装 flama 即可拥有实现 JWT 身份验证所需的所有工具。我们将使用诗歌来管理我们的依赖项,但如果您愿意,您也可以使用 pip:
poetry add "flama[full]" "aiosqlite"
aiosqlite 包需要将 SQLite 与 SQLAlchemy 一起使用,这是我们将在本示例中使用的数据库。
如果你想知道我们通常如何组织我们的项目,请查看我们之前的文章,其中我们详细解释了如何使用诗歌建立 python 项目,以及我们通常遵循的项目文件夹结构。
让我们从一个具有单个公共端点的简单应用程序开始。该端点将返回 API 的简要描述。
# src/app.py from flama import Flama app = Flama( title="Domain-driven API", version="1.0.0", description="Domain-driven design with Flama ?", docs="/docs/", ) @app.get("/", name="info") def info(): """ tags: - Info summary: Ping description: Returns a brief description of the API responses: 200: description: Successful ping. """ return {"title": app.schema.title, "description": app.schema.description, "public": True}
如果你想运行这个应用程序,你可以将上面的代码保存在 src 文件夹下一个名为 app.py 的文件中,然后运行以下命令(记住要激活诗歌环境,否则你需要在命令前面加上诗歌运行):
flama run --server-reload src.app:app INFO: Started server process [3267] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
其中 --server-reload 标志是可选的,用于在代码更改时自动重新加载服务器。这在开发过程中非常有用,但如果不需要,可以将其删除。有关可用选项的完整列表,您可以运行 flama run --help,或查看文档。
或者,您也可以通过运行以下脚本来运行应用程序,您可以将其保存为 src 文件夹下的 __main__.py:
# src/__main__.py import flama def main(): flama.run( flama_app="src.app:app", server_host="0.0.0.0", server_port=8000, server_reload=True ) if __name__ == "__main__": main()
然后,您可以通过执行以下命令来运行应用程序:
poetry add "flama[full]" "aiosqlite"
现在,为我们的应用程序设置了一个最小的框架,我们可以开始实现我们刚刚在
中回顾的 DDD 概念
一个试图模仿现实世界场景的简单示例的上下文。假设我们需要开发一个 API 来管理用户,并且我们有以下要求:
这组需求构成了我们之前所说的应用程序的域模型,它本质上只不过是以下用户工作流程的具体化:
现在,让我们使用存储库和工作模式来实现域模型。我们将从定义数据模型开始,然后实现存储库和工作模式。
我们的用户数据将存储在 SQLite 数据库中(您可以使用 SQLAlchemy 支持的任何其他数据库)。我们将使用以下数据模型来表示用户(您可以将此代码保存在 src 文件夹下名为 models.py 的文件中):
# src/app.py from flama import Flama app = Flama( title="Domain-driven API", version="1.0.0", description="Domain-driven design with Flama ?", docs="/docs/", ) @app.get("/", name="info") def info(): """ tags: - Info summary: Ping description: Returns a brief description of the API responses: 200: description: Successful ping. """ return {"title": app.schema.title, "description": app.schema.description, "public": True}
除了数据模型之外,我们还需要一个迁移脚本来创建数据库和表。为此,我们可以将以下代码保存在项目根目录下名为migrations.py的文件中:
poetry add "flama[full]" "aiosqlite"
然后,我们可以通过执行以下命令来运行迁移脚本:
# src/app.py from flama import Flama app = Flama( title="Domain-driven API", version="1.0.0", description="Domain-driven design with Flama ?", docs="/docs/", ) @app.get("/", name="info") def info(): """ tags: - Info summary: Ping description: Returns a brief description of the API responses: 200: description: Successful ping. """ return {"title": app.schema.title, "description": app.schema.description, "public": True}
在此示例中,我们只需要一个存储库,即处理用户表上的原子操作的存储库,其名称为 UserRepository。值得庆幸的是,flama 为与 SQLAlchemy 表相关的存储库提供了一个基类,称为 SQLAlchemyTableRepository。
类 SQLAlchemyTableRepository 提供了一组对表执行 CRUD 操作的方法,具体为:
就我们的示例而言,我们不需要对表进行任何进一步的操作,因此 SQLAlchemyTableRepository 提供的方法就足够了。我们可以将以下代码保存在 src 文件夹下名为 repositories.py 的文件中:
flama run --server-reload src.app:app INFO: Started server process [3267] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
如您所见,UserRepository 类是 SQLAlchemyTableRepository 的子类,它只需要在 _table 属性中设置表即可。这是我们为用户表建立一个功能齐全的存储库所需要做的唯一事情。
如果我们想添加标准 CRUD 操作之外的自定义方法,我们可以通过在 UserRepository 类中定义它们来实现。例如,如果我们想添加一个方法来统计活跃用户数,我们可以这样做:
# src/__main__.py import flama def main(): flama.run( flama_app="src.app:app", server_host="0.0.0.0", server_port=8000, server_reload=True ) if __name__ == "__main__": main()
虽然我们不会在示例中使用此方法,但很高兴知道我们可以根据需要向存储库添加自定义方法以及它们的实现方式
在存储库模式的上下文中。正如我们已经看到的,这是一个强大的设计模式,因为我们可以在这里实现所有数据访问逻辑,而无需更改应用程序的业务逻辑(在相应的资源方法中实现)。
工作单元模式用于封装数据访问逻辑,并提供一种将必须在单个事务中对数据源执行的所有操作进行分组的方法。在 flama 中,UoW 模式是使用 Worker 的名称实现的。与存储库模式相同,flama 为与 SQLAlchemy 表相关的工作人员提供了一个基类,称为 SQLAlchemyWorker。本质上,SQLAlchemyWorker 提供了到数据库的连接和事务,并使用工作连接实例化其所有存储库。在此示例中,我们的工作人员将仅使用单个存储库(即 UserRepository),但如果需要,我们可以添加更多存储库。
我们的worker将被称为RegisterWorker,我们可以将以下代码保存在src文件夹下名为workers.py的文件中:
poetry add "flama[full]" "aiosqlite"
因此,如果我们有更多存储库可供使用,例如 ProductRepository 和 OrderRepository,我们可以将它们添加到工作线程中,如下所示:
# src/app.py from flama import Flama app = Flama( title="Domain-driven API", version="1.0.0", description="Domain-driven design with Flama ?", docs="/docs/", ) @app.get("/", name="info") def info(): """ tags: - Info summary: Ping description: Returns a brief description of the API responses: 200: description: Successful ping. """ return {"title": app.schema.title, "description": app.schema.description, "public": True}
就这么简单,我们在应用程序中实现了存储库和工作模式。现在,我们可以继续实现资源方法,这些方法将提供与用户数据交互所需的 API 端点。
资源是 flama 应用程序的主要构建块之一。它们用于表示应用程序资源(在 RESTful 资源的意义上)并定义与它们交互的 API 端点。
在我们的示例中,我们将为用户定义一个名为 UserResource 的资源,其中包含创建、激活、登录和停用用户的方法。资源至少需要从 flama 内置 Resource 类派生,尽管 flama 提供了更复杂的类来使用,例如 RESTResource 和 CRUDResource。
我们可以将以下代码保存在 src 文件夹下名为 resources.py 的文件中:
flama run --server-reload src.app:app INFO: Started server process [3267] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
现在我们已经实现了数据模型、存储库和工作模式以及资源方法,我们需要修改之前介绍的基础应用程序,以便一切按预期运行。我们需要:
这将使 app.py 文件如下:
poetry add "flama[full]" "aiosqlite"
您应该已经很清楚 DDD 模式如何让我们能够将应用程序的业务逻辑(在资源方法中很容易阅读)与数据访问逻辑分开。 🎜>(在存储库和工作模式中实现)。还值得注意的是,这种关注点分离如何使代码更易于维护和测试,以及代码现在如何更符合我们在本示例开始时给出的业务需求。
在运行任何命令之前,请检查您的开发环境是否设置正确,并且文件夹结构如下:
# src/app.py from flama import Flama app = Flama( title="Domain-driven API", version="1.0.0", description="Domain-driven design with Flama ?", docs="/docs/", ) @app.get("/", name="info") def info(): """ tags: - Info summary: Ping description: Returns a brief description of the API responses: 200: description: Successful ping. """ return {"title": app.schema.title, "description": app.schema.description, "public": True}
如果一切设置正确,您可以通过执行以下命令来运行应用程序(请记住在运行应用程序之前运行迁移脚本):
flama run --server-reload src.app:app INFO: Started server process [3267] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
现在我们可以尝试刚刚实现的业务逻辑。请记住,您可以使用curl或Postman等工具来尝试此操作,也可以通过在浏览器中导航到http://localhost:8000/docs/来使用flama提供的自动生成的文档UI并从那里尝试端点。
要创建用户,您可以使用以下有效负载向 /user/ 发送 POST 请求:
# src/__main__.py import flama def main(): flama.run( flama_app="src.app:app", server_host="0.0.0.0", server_port=8000, server_reload=True ) if __name__ == "__main__": main()
因此,我们可以使用curl来发送请求,如下所示:
poetry run python src/__main__.py INFO: Started server process [3267] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
如果请求成功,您应该收到正文为空的 200 响应,并且将在数据库中创建用户。
要登录,您可以使用以下有效负载向 /user/signin/ 发送 POST 请求:
# src/models.py import uuid import sqlalchemy from flama.sqlalchemy import metadata from sqlalchemy.dialects.postgresql import UUID __all__ = ["user_table", "metadata"] user_table = sqlalchemy.Table( "user", metadata, sqlalchemy.Column("id", UUID(as_uuid=True), primary_key=True, nullable=False, default=uuid.uuid4), sqlalchemy.Column("name", sqlalchemy.String, nullable=False), sqlalchemy.Column("surname", sqlalchemy.String, nullable=False), sqlalchemy.Column("email", sqlalchemy.String, nullable=False, unique=True), sqlalchemy.Column("password", sqlalchemy.String, nullable=False), sqlalchemy.Column("active", sqlalchemy.Boolean, nullable=False), )
因此,我们可以使用curl来发送请求,如下所示:
# migrations.py from sqlalchemy import create_engine from src.models import metadata if __name__ == "__main__": # Set up the SQLite database engine = create_engine("sqlite:///models.db", echo=False) # Create the database tables metadata.create_all(engine) # Print a success message print("Database and User table created successfully.")
鉴于用户不活跃,您应该收到类似以下响应:
> poetry run python migrations.py Database and User table created successfully.
我们还可以测试如果有人尝试使用错误密码登录会发生什么:
# src/repositories.py from flama.ddd import SQLAlchemyTableRepository from src import models __all__ = ["UserRepository"] class UserRepository(SQLAlchemyTableRepository): _table = models.user_table
在这种情况下,您应该收到包含以下正文的 401 响应:
# src/repositories.py from flama.ddd import SQLAlchemyTableRepository from src import models __all__ = ["UserRepository"] class UserRepository(SQLAlchemyTableRepository): _table = models.user_table async def count_active_users(self): return len((await self._connection.execute(self._table.select().where(self._table.c.active == True))).all())
最后,我们还应该尝试使用不存在的用户登录:
# src/workers.py from flama.ddd import SQLAlchemyWorker from src import repositories __all__ = ["RegisterWorker"] class RegisterWorker(SQLAlchemyWorker): user: repositories.UserRepository
在这种情况下,您应该收到包含以下正文的 404 响应:
# src/workers.py from flama.ddd import SQLAlchemyWorker from src import repositories __all__ = ["RegisterWorker"] class RegisterWorker(SQLAlchemyWorker): user: repositories.UserRepository product: repositories.ProductRepository order: repositories.OrderRepository
探索了登录过程后,我们现在可以通过使用用户的凭据向 /user/activate/ 发送 POST 请求来激活用户:
# src/resources.py import hashlib import http import uuid from flama import types from flama.ddd.exceptions import NotFoundError from flama.exceptions import HTTPException from flama.http import APIResponse from flama.resources import Resource, resource_method from src import models, schemas, worker __all__ = ["AdminResource", "UserResource"] ENCRYPTION_SALT = uuid.uuid4().hex ENCRYPTION_PEPER = uuid.uuid4().hex class Password: def __init__(self, password: str): self._password = password def encrypt(self): return hashlib.sha512( (hashlib.sha512((self._password + ENCRYPTION_SALT).encode()).hexdigest() + ENCRYPTION_PEPER).encode() ).hexdigest() class UserResource(Resource): name = "user" verbose_name = "User" @resource_method("/", methods=["POST"], name="create") async def create(self, worker: worker.RegisterWorker, data: types.Schema[schemas.UserDetails]): """ tags: - User summary: User create description: Create a user responses: 200: description: User created in successfully. """ async with worker: try: await worker.user.retrieve(email=data["email"]) except NotFoundError: await worker.user.create({**data, "password": Password(data["password"]).encrypt(), "active": False}) return APIResponse(status_code=http.HTTPStatus.OK) @resource_method("/signin/", methods=["POST"], name="signin") async def signin(self, worker: worker.RegisterWorker, data: types.Schema[schemas.UserCredentials]): """ tags: - User summary: User sign in description: Create a user responses: 200: description: User signed in successfully. 401: description: User not active. 404: description: User not found. """ async with worker: password = Password(data["password"]) try: user = await worker.user.retrieve(email=data["email"]) except NotFoundError: raise HTTPException(status_code=http.HTTPStatus.NOT_FOUND) if user["password"] != password.encrypt(): raise HTTPException(status_code=http.HTTPStatus.UNAUTHORIZED) if not user["active"]: raise HTTPException( status_code=http.HTTPStatus.BAD_REQUEST, detail=f"User must be activated via /user/activate/" ) return APIResponse(status_code=http.HTTPStatus.OK, schema=types.Schema[schemas.User], content=user) @resource_method("/activate/", methods=["POST"], name="activate") async def activate(self, worker: worker.RegisterWorker, data: types.Schema[schemas.UserCredentials]): """ tags: - User summary: User activate description: Activate an existing user responses: 200: description: User activated successfully. 401: description: User activation failed due to invalid credentials. 404: description: User not found. """ async with worker: try: user = await worker.user.retrieve(email=data["email"]) except NotFoundError: raise HTTPException(status_code=http.HTTPStatus.NOT_FOUND) if user["password"] != Password(data["password"]).encrypt(): raise HTTPException(status_code=http.HTTPStatus.UNAUTHORIZED) if not user["active"]: await worker.user.update({**user, "active": True}, id=user["id"]) return APIResponse(status_code=http.HTTPStatus.OK) @resource_method("/deactivate/", methods=["POST"], name="deactivate") async def deactivate(self, worker: worker.RegisterWorker, data: types.Schema[schemas.UserCredentials]): """ tags: - User summary: User deactivate description: Deactivate an existing user responses: 200: description: User deactivated successfully. 401: description: User deactivation failed due to invalid credentials. 404: description: User not found. """ async with worker: try: user = await worker.user.retrieve(email=data["email"]) except NotFoundError: raise HTTPException(status_code=http.HTTPStatus.NOT_FOUND) if user["password"] != Password(data["password"]).encrypt(): raise HTTPException(status_code=http.HTTPStatus.UNAUTHORIZED) if user["active"]: await worker.user.update({**user, "active": False}, id=user["id"]) return APIResponse(status_code=http.HTTPStatus.OK)
通过此请求,用户应该被激活,并且您应该收到一个空正文的 200 响应。
与前面的情况一样,我们还可以测试如果有人尝试使用错误的密码激活用户会发生什么:
poetry add "flama[full]" "aiosqlite"
在这种情况下,您应该收到包含以下正文的 401 响应:
# src/app.py from flama import Flama app = Flama( title="Domain-driven API", version="1.0.0", description="Domain-driven design with Flama ?", docs="/docs/", ) @app.get("/", name="info") def info(): """ tags: - Info summary: Ping description: Returns a brief description of the API responses: 200: description: Successful ping. """ return {"title": app.schema.title, "description": app.schema.description, "public": True}
最后,我们还应该尝试激活一个不存在的用户:
flama run --server-reload src.app:app INFO: Started server process [3267] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
在这种情况下,您应该收到包含以下正文的 404 响应:
# src/__main__.py import flama def main(): flama.run( flama_app="src.app:app", server_host="0.0.0.0", server_port=8000, server_reload=True ) if __name__ == "__main__": main()
现在用户已激活,我们可以尝试再次登录:
poetry run python src/__main__.py INFO: Started server process [3267] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
这一次,应该返回包含用户信息的 200 响应:
# src/models.py import uuid import sqlalchemy from flama.sqlalchemy import metadata from sqlalchemy.dialects.postgresql import UUID __all__ = ["user_table", "metadata"] user_table = sqlalchemy.Table( "user", metadata, sqlalchemy.Column("id", UUID(as_uuid=True), primary_key=True, nullable=False, default=uuid.uuid4), sqlalchemy.Column("name", sqlalchemy.String, nullable=False), sqlalchemy.Column("surname", sqlalchemy.String, nullable=False), sqlalchemy.Column("email", sqlalchemy.String, nullable=False, unique=True), sqlalchemy.Column("password", sqlalchemy.String, nullable=False), sqlalchemy.Column("active", sqlalchemy.Boolean, nullable=False), )
最后,我们可以通过使用用户的凭据向 /user/deactivate/ 发送 POST 请求来停用用户:
# migrations.py from sqlalchemy import create_engine from src.models import metadata if __name__ == "__main__": # Set up the SQLite database engine = create_engine("sqlite:///models.db", echo=False) # Create the database tables metadata.create_all(engine) # Print a success message print("Database and User table created successfully.")
通过此请求,用户应该被停用,并且您应该收到带有空正文的 200 响应。
在这篇文章中,我们深入探讨了领域驱动设计 (DDD) 的世界,以及如何在 flama 应用程序中实现它。我们已经了解了 DDD 如何帮助我们将应用程序的业务逻辑与数据访问逻辑分离,以及这种关注点分离如何使代码更易于维护和测试。我们还了解了如何在 flama 应用程序中实现存储库和工作模式,以及如何使用它们来封装数据访问逻辑并提供一种对必须执行的所有操作进行分组的方法在单个事务中的数据源上。最后,我们了解了如何使用资源方法来定义与用户数据交互的 API 端点,以及如何使用 DDD 模式来实现我们在本示例开头给出的业务需求。
虽然我们在这里描述的登录过程并不完全现实,但您可以将本文的材料与之前有关 JWT 身份验证的文章结合起来,以实现更现实的过程,其中登录最终返回一个JWT 令牌。如果您对此感兴趣,可以使用flama查看JWT身份验证的帖子。
我们希望您发现这篇文章有用,并且您现在已准备好在自己的 flama 应用程序中实现 DDD。如果您有任何疑问或意见,请随时与我们联系。我们总是很乐意提供帮助!
请继续关注有关 flama 以及人工智能和软件开发领域其他令人兴奋的主题的更多帖子。下次见!
如果您喜欢我们所做的事情,可以通过一种免费且简单的方式来支持我们的工作。在 Flama 送给我们 ⭐。
GitHub ⭐ 对我们来说意味着一个世界,它为我们提供了最甜蜜的动力,让我们继续努力,帮助其他人踏上构建强大的机器学习 API 的旅程。
您还可以在 ? 上关注我们,我们在这里分享最新的新闻和更新,以及有关人工智能、软件开发等方面的有趣主题。
以上是使用 Flama 进行本机域驱动设计的详细内容。更多信息请关注PHP中文网其他相关文章!