用户指南:管道¶
本页面提供了有关如何创建管道的信息。
注意
管道异步运行,请参阅下面的示例。
创建组件¶
组件是执行异步工作的单元,它们执行简单的任务,例如文档分块或将结果保存到 Neo4j。此包包含一些默认组件,但开发人员可以通过以下步骤创建自己的组件
创建 Pydantic neo4j_graphrag.experimental.pipeline.DataModel 的子类,以表示组件返回的数据
创建 neo4j_graphrag.experimental.pipeline.Component 的子类
在这个新类中创建一个 run 方法,并使用刚刚创建的 DataModel 指定所需的输入和输出模型
实现 run 方法:它是一个 async 方法,允许在此方法中并行化和等待任务。
下面给出了一个示例,其中创建了一个 ComponentAdd 用于将两个数字相加并返回结果总和
from neo4j_graphrag.experimental.pipeline import Component, DataModel
class IntResultModel(DataModel):
result: int
class ComponentAdd(Component):
async def run(self, number1: int, number2: int = 1) -> IntResultModel:
return IntResultModel(result = number1 + number2)
在 API 文档中阅读更多关于 组件 的信息。
在管道中连接组件¶
创建组件的最终目标是将其组装成一个用于特定目的的复杂管道,例如从文本数据构建知识图谱。
以下是创建简单管道并将结果从一个组件传播到另一个组件的方法(详细解释如下)
import asyncio
from neo4j_graphrag.experimental.pipeline import Pipeline
pipe = Pipeline()
pipe.add_component(ComponentAdd(), "a")
pipe.add_component(ComponentAdd(), "b")
pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4}}))
# result: 10+1+4 = 15
首先,创建一个管道,并向其中添加两个名为“a”和“b”的组件。
接下来,连接这两个组件,以便“b”在“a”之后运行,其中组件“b”的“number2”参数是组件“a”的结果。
最后,管道以 10 和 1 作为“a”的输入参数运行。组件“b”将接收 11(10 + 1,即“a”的结果)作为“number1”,并将 4 作为“number2”(如 pipeline.run 参数中所指定)。
数据流如下图所示
10 ---\
Component "a" -> 11
1 ----/ \
\
Component "b" -> 15
4 -------------------------/
警告
循环图
管道中不允许循环。
警告
忽略用户输入
如果输入同时由用户在 pipeline.run 方法中提供,并作为连接方法中的 input_config 提供,则用户输入将被忽略。例如下面的管道,它是从前一个管道修改而来的
pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4, "number2": 42}}))
结果仍将是 **15**,因为用户输入 “number2”: 42 被忽略了。
可视化管道¶
可以使用 draw 方法可视化管道
from neo4j_graphrag.experimental.pipeline import Pipeline
pipe = Pipeline()
# ... define components and connections
pipe.draw("pipeline.html")
这是一个将管道渲染为交互式 HTML 可视化的示例
# To view the visualization in a browser
import webbrowser
webbrowser.open("pipeline.html")
默认情况下,未映射到任何组件的输出字段是隐藏的。可以通过将 hide_unused_outputs 设置为 False 将它们添加到可视化中
pipe.draw("pipeline_full.html", hide_unused_outputs=False)
# To view the full visualization in a browser
import webbrowser
webbrowser.open("pipeline_full.html")
添加事件回调¶
可以添加回调以接收有关管道进度的通知
PIPELINE_STARTED,当管道启动时
PIPELINE_FINISHED,当管道结束时
TASK_STARTED,当任务启动时
TASK_PROGRESS,由每个组件发送(取决于组件的实现,见下文)
TASK_FINISHED,当任务结束时
请参阅 PipelineEvent 和 TaskEvent 以了解每种事件类型中发送了什么。
import asyncio
import logging
from neo4j_graphrag.experimental.pipeline import Pipeline
from neo4j_graphrag.experimental.pipeline.types import Event
logger = logging.getLogger(__name__)
logging.basicConfig()
logger.setLevel(logging.WARNING)
async def event_handler(event: Event) -> None:
"""Function can do anything about the event,
here we're just logging it if it's a pipeline-level event.
"""
if event.event_type.is_pipeline_event:
logger.warning(event)
pipeline = Pipeline(
callback=event_handler,
)
# ... add components, connect them as usual
await pipeline.run(...)
从组件发送事件¶
组件可以通过实现 run_from_context 方法,使用 context_ 中的 notify 函数发送进度通知
from neo4j_graphrag.experimental.pipeline import Component, DataModel
from neo4j_graphrag.experimental.pipeline.types.context import RunContext
class IntResultModel(DataModel):
result: int
class ComponentAdd(Component):
async def run_with_context(self, context_: RunContext, number1: int, number2: int = 1) -> IntResultModel:
for fake_iteration in range(10):
await context_.notify(
message=f"Starting iteration {fake_iteration} out of 10",
data={"iteration": fake_iteration, "total": 10}
)
return IntResultModel(result = number1 + number2)
这将向管道回调发送一个 TASK_PROGRESS 事件。
注意
在未来的版本中,context_ 参数将被添加到 run 方法中。