引言
"禅是一种生活的艺术,是通往宁静与自由的途径。" —— 铃木大拙
在构建AI Agent和工作流自动化系统时,我遇到了一个核心问题:如何让多个任务按正确的顺序执行,同时保持代码的简洁和可维护性? Zen框架应运而生,它借鉴禅宗思想,追求简单、优雅、高效的模块化执行。
为什么需要Zen?
传统脚本的问题
``python
`传统方式:紧耦合、难维护
def main():
data = fetch_data()
cleaned = clean_data(data)
transformed = transform_data(cleaned)
result = save_data(transformed)
notify_user(result)
if __name__ == "__main__":
main()
- 逻辑耦合,难以复用
- 错误处理复杂
- 无法并行执行独立任务
- 缺少执行追踪
Zen的解决方案
`python
from zen import Zen, Task
定义任务
fetch_task = Task("fetch", func=fetch_data) clean_task = Task("clean", func=clean_data, deps=["fetch"]) transform_task = Task("transform", func=transform_data, deps=["clean"]) save_task = Task("save", func=save_data, deps=["transform"]) notify_task = Task("notify", func=notify_user, deps=["save"])构建工作流
zen = Zen() zen.add_tasks([fetch_task, clean_task, transform_task, save_task, notify_task])执行
zen.execute() `
优势:
- ✅ 声明式任务定义
- ✅ 自动依赖解析
- ✅ 并行执行优化
- ✅ 完整执行追踪
核心设计
1. 依赖图(Dependency Graph)
`
┌─────────┐
│ fetch │
└────┬────┘
│
▼
┌─────────┐
│ clean │
└────┬────┘
│
▼
┌─────────┐
│transform│
└────┬────┘
│
▼
┌─────────┐
│ save │
└────┬────┘
│
▼
┌─────────┐
│ notify │
└─────────┘
`
2. 核心概念
概念 说明 类比
Task(任务) 最小执行单元 禅宗公案
Module(模块) 任务集合 禅宗寺院
Graph(图) 依赖关系 因果轮回
Context(上下文) 执行环境 禅定境界
Executor(执行器) 调度引擎 禅师
架构实现
任务定义
`python
from dataclasses import dataclass, field
from typing import Callable, List, Optional, Any
from enum import Enum, auto
class TaskStatus(Enum):
PENDING = auto()
RUNNING = auto()
SUCCESS = auto()
FAILED = auto()
SKIPPED = auto()
@dataclass
class Task:
"""任务定义"""
name: str
func: Callable[..., Any]
deps: List[str] = field(default_factory=list)
inputs: Dict[str, Any] = field(default_factory=dict)
outputs: Dict[str, Any] = field(default_factory=dict)
retry: int = 0
timeout: Optional[float] = None
status: TaskStatus = field(default=TaskStatus.PENDING, init=False)
result: Any = field(default=None, init=False)
error: Optional[Exception] = field(default=None, init=False)
start_time: Optional[float] = field(default=None, init=False)
end_time: Optional[float] = field(default=None, init=False)
def execute(self, context: 'Context') -> Any:
"""执行任务"""
self.status = TaskStatus.RUNNING
self.start_time = time.time()
try:
# 注入依赖任务的输出
inputs = self._resolve_inputs(context)
self.result = self.func(**inputs)
self.status = TaskStatus.SUCCESS
except Exception as e:
self.error = e
self.status = TaskStatus.FAILED
raise
finally:
self.end_time = time.time()
return self.result
`
依赖图构建
`python
from collections import defaultdict, deque
class DependencyGraph:
"""依赖图管理"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.dependencies: Dict[str, Set[str]] = defaultdict(set)
self.dependents: Dict[str, Set[str]] = defaultdict(set)
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.name] = task
for dep in task.deps:
self.dependencies[task.name].add(dep)
self.dependents[dep].add(task.name)
def topological_sort(self) -> List[str]:
"""拓扑排序 - 确定执行顺序"""
in_degree = {name: len(deps) for name, deps in self.dependencies.items()}
queue = deque([name for name in self.tasks if in_degree[name] == 0])
result = []
while queue:
current = queue.popleft()
result.append(current)
for dependent in self.dependents[current]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(result) != len(self.tasks):
raise CircularDependencyError("检测到循环依赖")
return result
def get_execution_levels(self) -> List[List[str]]:
"""获取执行层级 - 同一层可并行执行"""
in_degree = {name: len(deps) for name, deps in self.dependencies.items()}
levels = []
current_level = [name for name in self.tasks if in_degree[name] == 0]
while current_level:
levels.append(current_level)
next_level = []
for task_name in current_level:
for dependent in self.dependents[task_name]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
next_level.append(dependent)
current_level = next_level
return levels
`
并行执行器
`python
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class ParallelExecutor:
"""并行执行器"""
def __init__(self, max_workers: int = 4, executor_type: str = "thread"):
self.max_workers = max_workers
self.executor_type = executor_type
self.executor = self._create_executor()
def _create_executor(self):
if self.executor_type == "thread":
return ThreadPoolExecutor(max_workers=self.max_workers)
elif self.executor_type == "process":
return ProcessPoolExecutor(max_workers=self.max_workers)
else:
raise ValueError(f"Unknown executor type: {self.executor_type}")
def execute_parallel(self, tasks: List[Task], context: Context) -> List[Any]:
"""并行执行任务"""
futures = [
self.executor.submit(task.execute, context)
for task in tasks
]
return [f.result() for f in futures]
async def execute_parallel_async(self, tasks: List[Task], context: Context) -> List[Any]:
"""异步并行执行"""
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(self.executor, task.execute, context)
for task in tasks
]
return await asyncio.gather(*futures)
`
实战案例
1. 数据ETL管道
`python
from zen import Zen, Task
import pandas as pd
定义任务函数
def extract_from_api(source: str) -> pd.DataFrame:
"""从API提取数据"""
import requests
response = requests.get(source)
return pd.DataFrame(response.json())
def extract_from_db(connection_string: str) -> pd.DataFrame:
"""从数据库提取数据"""
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
return pd.read_sql("SELECT * FROM users", engine)
def merge_dataframes(api_data: pd.DataFrame, db_data: pd.DataFrame) -> pd.DataFrame:
"""合并数据"""
return pd.merge(api_data, db_data, on="user_id", how="left")
def clean_data(data: pd.DataFrame) -> pd.DataFrame:
"""清洗数据"""
data = data.dropna()
data = data.drop_duplicates()
return data
def transform_data(data: pd.DataFrame) -> pd.DataFrame:
"""转换数据"""
data["age_group"] = pd.cut(data["age"], bins=[0, 18, 30, 50, 100],
labels=["少年", "青年", "中年", "老年"])
return data
def load_to_warehouse(data: pd.DataFrame, warehouse_url: str):
"""加载到数据仓库"""
data.to_parquet(f"{warehouse_url}/users.parquet", index=False)
return f"Loaded {len(data)} rows"
构建工作流
zen = Zen()
zen.add_tasks([
Task("extract_api", extract_from_api, inputs={"source": "https://api.example.com/users"}),
Task("extract_db", extract_from_db, inputs={"connection_string": "postgresql://localhost/db"}),
Task("merge", merge_data, deps=["extract_api", "extract_db"]),
Task("clean", clean_data, deps=["merge"]),
Task("transform", transform_data, deps=["clean"]),
Task("load", load_to_warehouse, deps=["transform"],
inputs={"warehouse_url": "s3://data-warehouse/"})
])
执行并可视化
result = zen.execute()
zen.visualize() # 生成执行图
`
2. AI Agent工作流
`python
from zen import Zen, Task
import openai
定义AI任务
def understand_intent(user_input: str) -> dict:
"""理解用户意图"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{
"role": "system",
"content": "分析用户意图,提取关键信息"
}, {
"role": "user",
"content": user_input
}]
)
return json.loads(response.choices[0].message.content)
def retrieve_knowledge(intent: dict) -> list:
"""检索知识库"""
from vector_db import search
return search(intent["keywords"])
def generate_response(intent: dict, knowledge: list) -> str:
"""生成回复"""
context = "\n".join(knowledge)
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{
"role": "system",
"content": f"基于以下知识回答问题:\n{context}"
}, {
"role": "user",
"content": intent["question"]
}]
)
return response.choices[0].message.content
def save_conversation(user_input: str, response: str, intent: dict):
"""保存对话历史"""
db.insert({
"input": user_input,
"response": response,
"intent": intent,
"timestamp": datetime.now()
})
构建AI工作流
agent = Zen()
agent.add_tasks([
Task("intent", understand_intent, inputs={"user_input": "{{user_input}}"}),
Task("retrieve", retrieve_knowledge, deps=["intent"]),
Task("generate", generate_response, deps=["intent", "retrieve"]),
Task("save", save_conversation, deps=["generate"])
])
运行Agent
result = agent.execute(context={"user_input": "Python中的装饰器怎么用?"})
print(result["generate"])
`
高级特性
1. 条件执行
`python
def should_process(data: dict) -> bool:
return data.get("status") == "active"
conditional_task = Task(
"conditional_process",
process_data,
deps=["fetch"],
condition=should_process # 条件判断
)
`
2. 错误重试
`python
unreliable_task = Task(
"api_call",
call_external_api,
retry=3, # 重试3次
retry_delay=2, # 每次间隔2秒
retry_backoff="exponential" # 指数退避
)
`
3. 执行追踪
`python
from zen.tracing import ConsoleTracer, JSONTracer
添加追踪器
zen.add_tracer(ConsoleTracer())
zen.add_tracer(JSONTracer(output_file="execution.json"))
执行后会生成详细的执行报告
zen.execute()
`
性能对比
场景 串行执行 Zen并行 提升
10个独立API调用 10s 2s 5x
数据处理管道 30s 12s 2.5x
AI工作流 5s 3s 1.7x
安装与使用
`bash
安装
pip install zen-framework
快速开始
zen init my-workflow
cd my-workflow
zen run
``
总结
Zen框架的设计哲学:
- 简单即美:最小化概念,最大化表达
- 显式优于隐式:依赖关系清晰可见
- 并行是常态:充分利用现代硬件
- 失败是常态:优雅的错误处理
与同类框架对比
| 特性 | Zen | Airflow | Prefect | Luigi |
|---|---|---|---|---|
| 学习曲线 | ⭐⭐ 简单 | ⭐⭐⭐⭐ 复杂 | ⭐⭐⭐ 中等 | ⭐⭐⭐ 中等 |
| 并行执行 | ✅ 内置 | ✅ 支持 | ✅ 支持 | ✅ 支持 |
| 可视化 | ✅ 内置 | ✅ 完善 | ✅ 完善 | ⚠️ 有限 |
| 适用规模 | 小到中型 | 大型 | 中大型 | 中大型 |
| 部署复杂度 | ⭐ 简单 | ⭐⭐⭐⭐⭐ 复杂 | ⭐⭐⭐⭐ 中等 | ⭐⭐⭐ 中等 |
相关项目
- XingJu 星聚 - 跨平台聚合应用
- Kongming 孔明 - Go微服务框架
最后更新:2026年4月3日