快速开始
本页解决一个问题:你的 task 数据有 owner_id,但 API 应该返回完整的 owner 对象 —— 且不会产生 N+1 查询。
目标
你有这样的数据:
raw_tasks = [
{"id": 10, "title": "Design docs", "owner_id": 7},
{"id": 11, "title": "Refine examples", "owner_id": 8},
]
你想要这样的响应:
[
{
"id": 10,
"title": "Design docs",
"owner": {"id": 7, "name": "Ada"}
},
{
"id": 11,
"title": "Refine examples",
"owner": {"id": 8, "name": "Bob"}
}
]
朴素做法是在循环中逐个获取 owner —— 这就是 N+1 问题。pydantic-resolve 用三个部分解决它:响应模型、loader 函数和解析器。
安装
Step 1:声明缺失字段
从一个 Pydantic 模型开始。owner 字段初始为 None,因为原始数据不包含它。你通过 resolve_owner 声明如何填充它:
from typing import Optional
from pydantic import BaseModel
from pydantic_resolve import Loader, Resolver, build_object
class UserView(BaseModel):
id: int
name: str
class TaskView(BaseModel):
id: int
title: str
owner_id: int
owner: Optional[UserView] = None # (1)
def resolve_owner(self, loader=Loader(user_loader)): # (2)
return loader.load(self.owner_id)
owner初始为None—— 解析器会填充它。resolve_<field_name>声明如何加载该字段。loader.load(self.owner_id)注册一个待批处理的 key —— 它不会立即调用user_loader。
心智模型
resolve_* 的含义是:这个字段需要从当前节点之外获取数据。
库中的其他功能都建立在这个想法之上:
post_* 在子树准备好之后运行,AutoLoad 可以完全消除编写 resolve_* 的需要。
Step 2:编写 Loader 函数
loader 接收一个批量的 key,并按相同顺序返回结果:
USERS = {
7: {"id": 7, "name": "Ada"},
8: {"id": 8, "name": "Bob"},
9: {"id": 9, "name": "Cara"},
}
async def user_loader(user_ids: list[int]):
users = [USERS.get(uid) for uid in user_ids]
return build_object(users, user_ids, lambda user: user.id)
build_object 将结果与 key 对齐 —— 每个 key 返回一个元素,没有匹配则返回 None。
Step 3:运行解析器
将它们组合起来:
raw_tasks = [
{"id": 10, "title": "Design docs", "owner_id": 7},
{"id": 11, "title": "Refine examples", "owner_id": 8},
]
tasks = [TaskView.model_validate(t) for t in raw_tasks]
tasks = await Resolver().resolve(tasks)
for t in tasks:
print(t.model_dump())
输出:
{'id': 10, 'title': 'Design docs', 'owner_id': 7, 'owner': {'id': 7, 'name': 'Ada'}}
{'id': 11, 'title': 'Refine examples', 'owner_id': 8, 'owner': {'id': 8, 'name': 'Bob'}}
批处理如何工作
假设有 100 个 task,解析器不会调用 user_loader 100 次:
sequenceDiagram
participant R as Resolver
participant T as TaskView x100
participant L as user_loader
R->>T: 扫描 resolve_owner 方法
T->>R: 收集 owner_id 值 [7, 8, 7, 9, ...]
R->>L: user_loader([7, 8, 9]) ← 一次调用
L->>R: [User7, User8, User9]
R->>T: 为每个 task 分配 owner
- 收集所有 task 中的
owner_id值。 - 用去重后的 key 一次性调用
user_loader。 - 将每个 user 映射回对应的 task。
配合 FastAPI
将同样的模型放入路由:
from fastapi import FastAPI
app = FastAPI()
@app.get("/tasks", response_model=list[TaskView])
async def get_tasks():
tasks = [TaskView.model_validate(t) for t in await db.get_tasks()]
return await Resolver().resolve(tasks)
路由不导入 SQLAlchemy,不编写 join 逻辑,也不考虑加载策略。它只声明业务语义。
同步或异步
resolve_* 支持两种形式:
# 同步 —— 直接返回 loader 调用
def resolve_owner(self, loader=Loader(user_loader)):
return loader.load(self.owner_id)
# 异步 —— 在赋值前转换结果
async def resolve_owner(self, loader=Loader(user_loader)):
user = await loader.load(self.owner_id)
return user
当你需要对加载的数据进行后处理时,使用异步形式。