用户指南:管道

本页面提供了有关如何创建管道的信息。

注意

管道异步运行,请参阅下面的示例。

创建组件

组件是执行异步工作的单元,它们执行简单的任务,例如文档分块或将结果保存到 Neo4j。此包包含一些默认组件,但开发人员可以通过以下步骤创建自己的组件

  1. 创建 Pydantic neo4j_graphrag.experimental.pipeline.DataModel 的子类,以表示组件返回的数据

  2. 创建 neo4j_graphrag.experimental.pipeline.Component 的子类

  3. 在这个新类中创建一个 run 方法,并使用刚刚创建的 DataModel 指定所需的输入和输出模型

  4. 实现 run 方法:它是一个 async 方法,允许在此方法中并行化和等待任务。

下面给出了一个示例,其中创建了一个 ComponentAdd 用于将两个数字相加并返回结果总和

from neo4j_graphrag.experimental.pipeline import Component, DataModel

class IntResultModel(DataModel):
    result: int

class ComponentAdd(Component):
    async def run(self, number1: int, number2: int = 1) -> IntResultModel:
        return IntResultModel(result = number1 + number2)

在 API 文档中阅读更多关于 组件 的信息。

在管道中连接组件

创建组件的最终目标是将其组装成一个用于特定目的的复杂管道,例如从文本数据构建知识图谱。

以下是创建简单管道并将结果从一个组件传播到另一个组件的方法(详细解释如下)

import asyncio
from neo4j_graphrag.experimental.pipeline import Pipeline

pipe = Pipeline()
pipe.add_component(ComponentAdd(), "a")
pipe.add_component(ComponentAdd(), "b")

pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4}}))
# result: 10+1+4 = 15
  1. 首先,创建一个管道,并向其中添加两个名为“a”和“b”的组件。

  2. 接下来,连接这两个组件,以便“b”在“a”之后运行,其中组件“b”的“number2”参数是组件“a”的结果。

  3. 最后,管道以 10 和 1 作为“a”的输入参数运行。组件“b”将接收 11(10 + 1,即“a”的结果)作为“number1”,并将 4 作为“number2”(如 pipeline.run 参数中所指定)。

数据流如下图所示

10 ---\
        Component "a" -> 11
1 ----/                   \
                           \
                             Component "b" -> 15
4 -------------------------/

警告

循环图

管道中不允许循环。

警告

忽略用户输入

如果输入同时由用户在 pipeline.run 方法中提供,并作为连接方法中的 input_config 提供,则用户输入将被忽略。例如下面的管道,它是从前一个管道修改而来的

pipe.connect("a", "b", input_config={"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4, "number2": 42}}))

结果仍将是 **15**,因为用户输入 “number2”: 42 被忽略了。

可视化管道

可以使用 draw 方法可视化管道

from neo4j_graphrag.experimental.pipeline import Pipeline

pipe = Pipeline()
# ... define components and connections

pipe.draw("pipeline.html")

这是一个将管道渲染为交互式 HTML 可视化的示例

# To view the visualization in a browser
import webbrowser
webbrowser.open("pipeline.html")

默认情况下,未映射到任何组件的输出字段是隐藏的。可以通过将 hide_unused_outputs 设置为 False 将它们添加到可视化中

pipe.draw("pipeline_full.html", hide_unused_outputs=False)

# To view the full visualization in a browser
import webbrowser
webbrowser.open("pipeline_full.html")

添加事件回调

可以添加回调以接收有关管道进度的通知

  • PIPELINE_STARTED,当管道启动时

  • PIPELINE_FINISHED,当管道结束时

  • TASK_STARTED,当任务启动时

  • TASK_PROGRESS,由每个组件发送(取决于组件的实现,见下文)

  • TASK_FINISHED,当任务结束时

请参阅 PipelineEventTaskEvent 以了解每种事件类型中发送了什么。

import asyncio
import logging

from neo4j_graphrag.experimental.pipeline import Pipeline
from neo4j_graphrag.experimental.pipeline.types import Event

logger = logging.getLogger(__name__)
logging.basicConfig()
logger.setLevel(logging.WARNING)


async def event_handler(event: Event) -> None:
    """Function can do anything about the event,
    here we're just logging it if it's a pipeline-level event.
    """
    if event.event_type.is_pipeline_event:
        logger.warning(event)

pipeline = Pipeline(
    callback=event_handler,
)
# ... add components, connect them as usual

await pipeline.run(...)

从组件发送事件

组件可以通过实现 run_from_context 方法,使用 context_ 中的 notify 函数发送进度通知

from neo4j_graphrag.experimental.pipeline import Component, DataModel
from neo4j_graphrag.experimental.pipeline.types.context import RunContext

class IntResultModel(DataModel):
    result: int

class ComponentAdd(Component):
    async def run_with_context(self, context_: RunContext, number1: int, number2: int = 1) -> IntResultModel:
        for fake_iteration in range(10):
            await context_.notify(
                message=f"Starting iteration {fake_iteration} out of 10",
                data={"iteration": fake_iteration, "total": 10}
            )
        return IntResultModel(result = number1 + number2)

这将向管道回调发送一个 TASK_PROGRESS 事件。

注意

在未来的版本中,context_ 参数将被添加到 run 方法中。

© . All rights reserved.