← 返回首页

Zen框架:Python模块化执行的禅意之道

引言

"禅是一种生活的艺术,是通往宁静与自由的途径。" —— 铃木大拙

在构建AI Agent和工作流自动化系统时,我遇到了一个核心问题:如何让多个任务按正确的顺序执行,同时保持代码的简洁和可维护性? Zen框架应运而生,它借鉴禅宗思想,追求简单、优雅、高效的模块化执行。


为什么需要Zen?

传统脚本的问题

# 传统方式:紧耦合、难维护
            def main():
                data = fetch_data()
                cleaned = clean_data(data)
                transformed = transform_data(cleaned)
                result = save_data(transformed)
                notify_user(result)
            
            if __name__ == "__main__":
                main()
            

痛点: - 逻辑耦合,难以复用 - 错误处理复杂 - 无法并行执行独立任务 - 缺少执行追踪

Zen的解决方案

from zen import Zen, Task
            
            # 定义任务
            fetch_task = Task("fetch", func=fetch_data)
            clean_task = Task("clean", func=clean_data, deps=["fetch"])
            transform_task = Task("transform", func=transform_data, deps=["clean"])
            save_task = Task("save", func=save_data, deps=["transform"])
            notify_task = Task("notify", func=notify_user, deps=["save"])
            
            # 构建工作流
            zen = Zen()
            zen.add_tasks([fetch_task, clean_task, transform_task, save_task, notify_task])
            
            # 执行
            zen.execute()
            

优势: - ✅ 声明式任务定义 - ✅ 自动依赖解析 - ✅ 并行执行优化 - ✅ 完整执行追踪


核心设计

1. 依赖图(Dependency Graph)

┌─────────┐
            │  fetch  │
            └────┬────┘
                 │
                 ▼
            ┌─────────┐
            │  clean  │
            └────┬────┘
                 │
                 ▼
            ┌─────────┐
            │transform│
            └────┬────┘
                 │
                 ▼
            ┌─────────┐
            │  save   │
            └────┬────┘
                 │
                 ▼
            ┌─────────┐
            │ notify  │
            └─────────┘
            

2. 核心概念

概念 说明 类比
Task(任务) 最小执行单元 禅宗公案
Module(模块) 任务集合 禅宗寺院
Graph(图) 依赖关系 因果轮回
Context(上下文) 执行环境 禅定境界
Executor(执行器) 调度引擎 禅师

架构实现

任务定义

from dataclasses import dataclass, field
            from typing import Callable, List, Optional, Any
            from enum import Enum, auto
            
            class TaskStatus(Enum):
                PENDING = auto()
                RUNNING = auto()
                SUCCESS = auto()
                FAILED = auto()
                SKIPPED = auto()
            
            @dataclass
            class Task:
                """任务定义"""
                name: str
                func: Callable[..., Any]
                deps: List[str] = field(default_factory=list)
                inputs: Dict[str, Any] = field(default_factory=dict)
                outputs: Dict[str, Any] = field(default_factory=dict)
                retry: int = 0
                timeout: Optional[float] = None
            
                status: TaskStatus = field(default=TaskStatus.PENDING, init=False)
                result: Any = field(default=None, init=False)
                error: Optional[Exception] = field(default=None, init=False)
                start_time: Optional[float] = field(default=None, init=False)
                end_time: Optional[float] = field(default=None, init=False)
            
                def execute(self, context: 'Context') -> Any:
                    """执行任务"""
                    self.status = TaskStatus.RUNNING
                    self.start_time = time.time()
            
                    try:
                        # 注入依赖任务的输出
                        inputs = self._resolve_inputs(context)
                        self.result = self.func(**inputs)
                        self.status = TaskStatus.SUCCESS
                    except Exception as e:
                        self.error = e
                        self.status = TaskStatus.FAILED
                        raise
                    finally:
                        self.end_time = time.time()
            
                    return self.result
            

依赖图构建

from collections import defaultdict, deque
            
            class DependencyGraph:
                """依赖图管理"""
            
                def __init__(self):
                    self.tasks: Dict[str, Task] = {}
                    self.dependencies: Dict[str, Set[str]] = defaultdict(set)
                    self.dependents: Dict[str, Set[str]] = defaultdict(set)
            
                def add_task(self, task: Task):
                    """添加任务"""
                    self.tasks[task.name] = task
            
                    for dep in task.deps:
                        self.dependencies[task.name].add(dep)
                        self.dependents[dep].add(task.name)
            
                def topological_sort(self) -> List[str]:
                    """拓扑排序 - 确定执行顺序"""
                    in_degree = {name: len(deps) for name, deps in self.dependencies.items()}
                    queue = deque([name for name in self.tasks if in_degree[name] == 0])
                    result = []
            
                    while queue:
                        current = queue.popleft()
                        result.append(current)
            
                        for dependent in self.dependents[current]:
                            in_degree[dependent] -= 1
                            if in_degree[dependent] == 0:
                                queue.append(dependent)
            
                    if len(result) != len(self.tasks):
                        raise CircularDependencyError("检测到循环依赖")
            
                    return result
            
                def get_execution_levels(self) -> List[List[str]]:
                    """获取执行层级 - 同一层可并行执行"""
                    in_degree = {name: len(deps) for name, deps in self.dependencies.items()}
                    levels = []
                    current_level = [name for name in self.tasks if in_degree[name] == 0]
            
                    while current_level:
                        levels.append(current_level)
                        next_level = []
            
                        for task_name in current_level:
                            for dependent in self.dependents[task_name]:
                                in_degree[dependent] -= 1
                                if in_degree[dependent] == 0:
                                    next_level.append(dependent)
            
                        current_level = next_level
            
                    return levels
            

并行执行器

import asyncio
            from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
            
            class ParallelExecutor:
                """并行执行器"""
            
                def __init__(self, max_workers: int = 4, executor_type: str = "thread"):
                    self.max_workers = max_workers
                    self.executor_type = executor_type
                    self.executor = self._create_executor()
            
                def _create_executor(self):
                    if self.executor_type == "thread":
                        return ThreadPoolExecutor(max_workers=self.max_workers)
                    elif self.executor_type == "process":
                        return ProcessPoolExecutor(max_workers=self.max_workers)
                    else:
                        raise ValueError(f"Unknown executor type: {self.executor_type}")
            
                def execute_parallel(self, tasks: List[Task], context: Context) -> List[Any]:
                    """并行执行任务"""
                    futures = [
                        self.executor.submit(task.execute, context)
                        for task in tasks
                    ]
                    return [f.result() for f in futures]
            
                async def execute_parallel_async(self, tasks: List[Task], context: Context) -> List[Any]:
                    """异步并行执行"""
                    loop = asyncio.get_event_loop()
                    futures = [
                        loop.run_in_executor(self.executor, task.execute, context)
                        for task in tasks
                    ]
                    return await asyncio.gather(*futures)
            

实战案例

1. 数据ETL管道

from zen import Zen, Task
            import pandas as pd
            
            # 定义任务函数
            def extract_from_api(source: str) -> pd.DataFrame:
                """从API提取数据"""
                import requests
                response = requests.get(source)
                return pd.DataFrame(response.json())
            
            def extract_from_db(connection_string: str) -> pd.DataFrame:
                """从数据库提取数据"""
                import sqlalchemy
                engine = sqlalchemy.create_engine(connection_string)
                return pd.read_sql("SELECT * FROM users", engine)
            
            def merge_dataframes(api_data: pd.DataFrame, db_data: pd.DataFrame) -> pd.DataFrame:
                """合并数据"""
                return pd.merge(api_data, db_data, on="user_id", how="left")
            
            def clean_data(data: pd.DataFrame) -> pd.DataFrame:
                """清洗数据"""
                data = data.dropna()
                data = data.drop_duplicates()
                return data
            
            def transform_data(data: pd.DataFrame) -> pd.DataFrame:
                """转换数据"""
                data["age_group"] = pd.cut(data["age"], bins=[0, 18, 30, 50, 100], 
                                           labels=["少年", "青年", "中年", "老年"])
                return data
            
            def load_to_warehouse(data: pd.DataFrame, warehouse_url: str):
                """加载到数据仓库"""
                data.to_parquet(f"{warehouse_url}/users.parquet", index=False)
                return f"Loaded {len(data)} rows"
            
            # 构建工作流
            zen = Zen()
            
            zen.add_tasks([
                Task("extract_api", extract_from_api, inputs={"source": "https://api.example.com/users"}),
                Task("extract_db", extract_from_db, inputs={"connection_string": "postgresql://localhost/db"}),
                Task("merge", merge_data, deps=["extract_api", "extract_db"]),
                Task("clean", clean_data, deps=["merge"]),
                Task("transform", transform_data, deps=["clean"]),
                Task("load", load_to_warehouse, deps=["transform"], 
                     inputs={"warehouse_url": "s3://data-warehouse/"})
            ])
            
            # 执行并可视化
            result = zen.execute()
            zen.visualize()  # 生成执行图
            

2. AI Agent工作流

from zen import Zen, Task
            import openai
            
            # 定义AI任务
            def understand_intent(user_input: str) -> dict:
                """理解用户意图"""
                response = openai.ChatCompletion.create(
                    model="gpt-4",
                    messages=[{
                        "role": "system",
                        "content": "分析用户意图,提取关键信息"
                    }, {
                        "role": "user",
                        "content": user_input
                    }]
                )
                return json.loads(response.choices[0].message.content)
            
            def retrieve_knowledge(intent: dict) -> list:
                """检索知识库"""
                from vector_db import search
                return search(intent["keywords"])
            
            def generate_response(intent: dict, knowledge: list) -> str:
                """生成回复"""
                context = "\n".join(knowledge)
                response = openai.ChatCompletion.create(
                    model="gpt-4",
                    messages=[{
                        "role": "system",
                        "content": f"基于以下知识回答问题:\n{context}"
                    }, {
                        "role": "user",
                        "content": intent["question"]
                    }]
                )
                return response.choices[0].message.content
            
            def save_conversation(user_input: str, response: str, intent: dict):
                """保存对话历史"""
                db.insert({
                    "input": user_input,
                    "response": response,
                    "intent": intent,
                    "timestamp": datetime.now()
                })
            
            # 构建AI工作流
            agent = Zen()
            agent.add_tasks([
                Task("intent", understand_intent, inputs={"user_input": "{{user_input}}"}),
                Task("retrieve", retrieve_knowledge, deps=["intent"]),
                Task("generate", generate_response, deps=["intent", "retrieve"]),
                Task("save", save_conversation, deps=["generate"])
            ])
            
            # 运行Agent
            result = agent.execute(context={"user_input": "Python中的装饰器怎么用?"})
            print(result["generate"])
            

高级特性

1. 条件执行

def should_process(data: dict) -> bool:
                return data.get("status") == "active"
            
            conditional_task = Task(
                "conditional_process",
                process_data,
                deps=["fetch"],
                condition=should_process  # 条件判断
            )
            

2. 错误重试

unreliable_task = Task(
                "api_call",
                call_external_api,
                retry=3,                    # 重试3次
                retry_delay=2,              # 每次间隔2秒
                retry_backoff="exponential" # 指数退避
            )
            

3. 执行追踪

from zen.tracing import ConsoleTracer, JSONTracer
            
            # 添加追踪器
            zen.add_tracer(ConsoleTracer())
            zen.add_tracer(JSONTracer(output_file="execution.json"))
            
            # 执行后会生成详细的执行报告
            zen.execute()
            

性能对比

场景 串行执行 Zen并行 提升
10个独立API调用 10s 2s 5x
数据处理管道 30s 12s 2.5x
AI工作流 5s 3s 1.7x

安装与使用

# 安装
            pip install zen-framework
            
            # 快速开始
            zen init my-workflow
            cd my-workflow
            zen run
            

总结

Zen框架的设计哲学:

  1. 简单即美:最小化概念,最大化表达
  2. 显式优于隐式:依赖关系清晰可见
  3. 并行是常态:充分利用现代硬件
  4. 失败是常态:优雅的错误处理

与同类框架对比

特性 Zen Airflow Prefect Luigi
学习曲线 ⭐⭐ 简单 ⭐⭐⭐⭐ 复杂 ⭐⭐⭐ 中等 ⭐⭐⭐ 中等
并行执行 ✅ 内置 ✅ 支持 ✅ 支持 ✅ 支持
可视化 ✅ 内置 ✅ 完善 ✅ 完善 ⚠️ 有限
适用规模 小到中型 大型 中大型 中大型
部署复杂度 ⭐ 简单 ⭐⭐⭐⭐⭐ 复杂 ⭐⭐⭐⭐ 中等 ⭐⭐⭐ 中等

相关项目


最后更新:2026年4月3日