订阅引擎

此页面描述了使用 @neo4j/graphql 服务器设置 GraphQL 订阅的不同方法。

默认

如果 subscriptions 功能设置为 true,则会自动设置默认行为,如入门中所述。

new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: true
    },
});

此行为启用了一个简单的订阅系统,该系统可在单个实例上运行。它非常适合开发、测试和不需要水平扩展的服务器。

更改数据捕获 Beta

如果您的数据库支持更改数据捕获 (CDC),则可以使用它作为您的订阅机制,使用 Neo4jGraphQLSubscriptionsCDCEngine。请务必按照CDC 文档中描述的步骤,在您的 Neo4j 实例中以 FULL 模式启用它。

请注意,基于 CDC 的订阅与其他订阅机制的行为有所不同。在这种情况下,它使用 Neo4j 数据库的本机 CDC 事件。这具有以下含义

  • 将报告任何数据库更改,包括在 GraphQL 外部进行的更改。

  • 目前不支持关系事件。

  • 不需要额外的代理机制。所有事件都由 @neo4j/graphql 的所有实例接收。

  • 事件不会立即触发,而是轮询到数据库。

用法

Neo4jGraphQLSubscriptionsCDCEngine 可以直接从库中导入。Neo4j 驱动程序是唯一必需的参数

import { Neo4jGraphQL, Neo4jGraphQLSubscriptionsCDCEngine } from '@neo4j/graphql';

const engine = new Neo4jGraphQLSubscriptionsCDCEngine({
    driver,
})

const neoSchema = new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: engine,
    },
});

API

以下选项可以传递给构造函数

  • driver:用于 CDC 查询的驱动程序。

  • pollTime:CDC 查询之间的时间间隔(以毫秒为单位)。默认为 100 毫秒。请注意,轮询时间是从一个请求结束到下一个请求开始的时间段。CDC 事件触发订阅的实际时间还取决于您的网络。

  • queryConfig:一个包含要传递给 CDC 请求的驱动程序查询选项的对象。使用 db 字段定义 CDC 的目标数据库。

AMQP

水平扩展中所述,在具有多个实例的服务器上使用订阅可能很复杂。因此,建议的方法是使用 PubSub 系统,这可以通过 AMQP 代理(如RabbitMQ)来实现。这受@neo4j/graphql-amqp-subscriptions-engine软件包支持。

@neo4j/graphql-amqp-subscriptions-engine 插件通过 AMQP 0-9-1 协议连接到消息代理,以在所有服务器实例之间分发订阅事件。

一些支持此协议的代理是

可以使用 npm 安装插件

npm install @neo4j/graphql-amqp-subscriptions-engine

此插件**不支持** AMQP 1.0。

用法

AMQP 插件应实例化并传递到 features 中的 subscription 字段。这将自动启用使用 AMQP 代理作为消息队列的订阅。

const { Neo4jGraphQLAMQPSubscriptionsEngine } = require("@neo4j/graphql-amqp-subscriptions-engine");

const amqpSubscription = new Neo4jGraphQLAMQPSubscriptionsEngine({
    connection: {
        hostname: "localhost",
        username: "guest",
        password: "guest",
    }
});

const neoSchema = new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: amqpSubscription,
    },
});

API

以下选项可以传递给构造函数

  • connection:作为字符串或配置对象的 AMQP uri。

    • hostname:要使用的主机名。默认为 localhost

    • username:默认为 guest

    • password:默认为 guest

    • port:AMQP 代理的端口。默认为 5672

  • exchange:要在代理中使用的交换机。默认为 neo4j.graphql.subscriptions.fx

  • version:要使用的 AMQP 版本。目前仅支持 0-9-1

此外,任何受amqplib支持的选项都可以传递给 connection。要设置这些配置,请使用以下方法

  • close(): Promise<void>:关闭创建的连接和通道,并取消绑定事件发射器。

自定义订阅引擎

如果现有的引擎都不适用于您的用例,您可以创建一个新的引擎来连接到您可能需要的任何代理。为此,您需要创建一个新类来定义您的消息传递行为,并且它必须包含

  • 一个名为 eventsEventEmitter 属性,该属性应在代理发送消息时每次都发出一个事件。

  • 一个 publish 方法,该方法应将新事件发布到代理。

  • 可选地,一个返回 promise 的 init 方法,该方法应在 getSchema 上调用。这对于设置与代理的连接很有用。

如果您想使用redis处理订阅

// Note: This is an example of a custom subscription behavior and not a production ready redis implementation.
class CustomRedisSubscriptionEngine {
    constructor(redisClient) {
        this.client = redisClient;
        this.events = new EventEmitter();
    }

    // This method connects to Redis and sends messages to the eventEmitter when receiving events.
    async init(){
        await this.client.connect();
        this.subscriber = this.client.duplicate()
        this.publisher = this.client.duplicate();
        await this.subscriber.connect();
        await this.publisher.connect();

        await this.subscriber.subscribe("graphql-subscriptions", (message) => {
          const eventMeta = JSON.parse(message);
          this.events.emit(eventMeta.event, eventMeta); // Emits a new event when receiving a new message from redis
        });
    }

    async publish(eventMeta) {
        await this.publisher.publish("graphql-subscriptions", JSON.stringify(eventMeta)); // Sends a message to redis
    }
}

const client = createClient(); // From https://npmjs.net.cn/package/redis
const redisSubscriptions = new CustomRedisSubscriptionEngine(client)

const neoSchema = new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: redisSubscriptions,
    },
});

请注意,通常需要额外的属性和方法来处理与代理的连接。但是,只要消息通过 publish 方法发送到代理,并且这些消息通过 events 属性接收并发出,订阅就会得到正确处理。

使用 Typescript

如果使用 Typescript,您可以导入接口 Neo4jGraphQLSubscriptionsEngine 来实现您自己的类。确保 API 正确定义

class CustomRedisEngine implements Neo4jGraphQLSubscriptionsEngine {}

事件按顺序发送到类。但是,一旦这些事件通过代理广播,顺序就不再保证。对于需要排序的情况,您必须在订阅有效负载中设置 timestamp 字段。