订阅引擎
此页面描述了使用 @neo4j/graphql
服务器设置 GraphQL 订阅的不同方法。
默认
如果 subscriptions
功能设置为 true
,则会自动设置默认行为,如入门中所述。
new Neo4jGraphQL({
typeDefs,
driver,
features: {
subscriptions: true
},
});
此行为启用了一个简单的订阅系统,该系统可在单个实例上运行。它非常适合开发、测试和不需要水平扩展的服务器。
更改数据捕获 Beta
如果您的数据库支持更改数据捕获 (CDC),则可以使用它作为您的订阅机制,使用 Neo4jGraphQLSubscriptionsCDCEngine
。请务必按照CDC 文档中描述的步骤,在您的 Neo4j 实例中以 FULL
模式启用它。
请注意,基于 CDC 的订阅与其他订阅机制的行为有所不同。在这种情况下,它使用 Neo4j 数据库的本机 CDC 事件。这具有以下含义
-
将报告任何数据库更改,包括在 GraphQL 外部进行的更改。
-
目前不支持关系事件。
-
不需要额外的代理机制。所有事件都由
@neo4j/graphql
的所有实例接收。 -
事件不会立即触发,而是轮询到数据库。
用法
Neo4jGraphQLSubscriptionsCDCEngine
可以直接从库中导入。Neo4j 驱动程序是唯一必需的参数
import { Neo4jGraphQL, Neo4jGraphQLSubscriptionsCDCEngine } from '@neo4j/graphql';
const engine = new Neo4jGraphQLSubscriptionsCDCEngine({
driver,
})
const neoSchema = new Neo4jGraphQL({
typeDefs,
driver,
features: {
subscriptions: engine,
},
});
AMQP
如水平扩展中所述,在具有多个实例的服务器上使用订阅可能很复杂。因此,建议的方法是使用 PubSub 系统,这可以通过 AMQP 代理(如RabbitMQ)来实现。这受@neo4j/graphql-amqp-subscriptions-engine软件包支持。
@neo4j/graphql-amqp-subscriptions-engine
插件通过 AMQP 0-9-1
协议连接到消息代理,以在所有服务器实例之间分发订阅事件。
一些支持此协议的代理是
可以使用 npm
安装插件
npm install @neo4j/graphql-amqp-subscriptions-engine
此插件**不支持** AMQP 1.0。 |
用法
AMQP 插件应实例化并传递到 features
中的 subscription
字段。这将自动启用使用 AMQP 代理作为消息队列的订阅。
const { Neo4jGraphQLAMQPSubscriptionsEngine } = require("@neo4j/graphql-amqp-subscriptions-engine");
const amqpSubscription = new Neo4jGraphQLAMQPSubscriptionsEngine({
connection: {
hostname: "localhost",
username: "guest",
password: "guest",
}
});
const neoSchema = new Neo4jGraphQL({
typeDefs,
driver,
features: {
subscriptions: amqpSubscription,
},
});
API
以下选项可以传递给构造函数
-
connection:作为字符串或配置对象的 AMQP uri。
-
hostname:要使用的主机名。默认为
localhost
。 -
username:默认为
guest
。 -
password:默认为
guest
。 -
port:AMQP 代理的端口。默认为
5672
。
-
-
exchange:要在代理中使用的交换机。默认为
neo4j.graphql.subscriptions.fx
。 -
version:要使用的 AMQP 版本。目前仅支持
0-9-1
。
此外,任何受amqplib支持的选项都可以传递给 connection
。要设置这些配置,请使用以下方法
-
close(): Promise<void>:关闭创建的连接和通道,并取消绑定事件发射器。
自定义订阅引擎
如果现有的引擎都不适用于您的用例,您可以创建一个新的引擎来连接到您可能需要的任何代理。为此,您需要创建一个新类来定义您的消息传递行为,并且它必须包含
-
一个名为
events
的EventEmitter
属性,该属性应在代理发送消息时每次都发出一个事件。 -
一个
publish
方法,该方法应将新事件发布到代理。 -
可选地,一个返回 promise 的
init
方法,该方法应在getSchema
上调用。这对于设置与代理的连接很有用。
如果您想使用redis处理订阅
// Note: This is an example of a custom subscription behavior and not a production ready redis implementation.
class CustomRedisSubscriptionEngine {
constructor(redisClient) {
this.client = redisClient;
this.events = new EventEmitter();
}
// This method connects to Redis and sends messages to the eventEmitter when receiving events.
async init(){
await this.client.connect();
this.subscriber = this.client.duplicate()
this.publisher = this.client.duplicate();
await this.subscriber.connect();
await this.publisher.connect();
await this.subscriber.subscribe("graphql-subscriptions", (message) => {
const eventMeta = JSON.parse(message);
this.events.emit(eventMeta.event, eventMeta); // Emits a new event when receiving a new message from redis
});
}
async publish(eventMeta) {
await this.publisher.publish("graphql-subscriptions", JSON.stringify(eventMeta)); // Sends a message to redis
}
}
const client = createClient(); // From https://npmjs.net.cn/package/redis
const redisSubscriptions = new CustomRedisSubscriptionEngine(client)
const neoSchema = new Neo4jGraphQL({
typeDefs,
driver,
features: {
subscriptions: redisSubscriptions,
},
});
请注意,通常需要额外的属性和方法来处理与代理的连接。但是,只要消息通过 publish
方法发送到代理,并且这些消息通过 events
属性接收并发出,订阅就会得到正确处理。