使用 Docker 运行

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发中,并且在 Neo4j 4.4 版之后将不再支持。

最新的 Kafka Connect Neo4j 连接器版本可以在这里找到 这里.

Neo4j Streams 插件

介绍

当 Neo4j 在 Docker 中运行时,需要考虑一些特殊情况;有关详细信息,请参阅 Neo4j Docker 配置。特别是,在 neo4j.conf 中使用的配置格式看起来有所不同。

请注意,Neo4j Docker 映像使用命名约定;您可以通过在每个 neo4j.conf 属性之前添加 NEO4J_ 并使用以下转换来覆盖它。

  • 单个下划线转换为双下划线:_ → __

  • 点转换为单个下划线:._

示例

  • dbms.memory.heap.max_size=8GNEO4J_dbms_memory_heap_max__size: 8G

  • dbms.logs.debug.level=DEBUGNEO4J_dbms_logs_debug_level: DEBUG

有关更多信息和示例,请参阅本节和文档的 Confluent 与 Docker 部分。

另一个需要注意的重要事项是可能存在的权限问题。如果您想在 Docker 中使用主机卷运行 Kafka,而该卷的拥有者不是用户,那么您将遇到权限错误。有两种可能的解决方案。

  • 使用 root 用户

  • 更改卷的权限,使其可供非 root 用户访问

Neo4j docker 容器基于一种方法构建,该方法使用传递给容器的环境变量作为配置 Neo4j 的方式。环境变量不能包含某些字符,特别是连字符 - 字符。将插件配置为使用包含这些字符的流名称将无法正常工作,因为诸如 NEO4J_streams_sink_topic_cypher_my-topic 之类的配置环境变量无法正确评估为环境变量 (my-topic)。这是 Neo4j docker 容器的限制,而不是 neo4j-streams。

请注意,Neo4j Docker 映像使用命名约定;您可以通过在每个 neo4j.conf 属性之前添加 NEO4J_ 并使用以下转换来覆盖它。

  • 单个下划线转换为双下划线:_ → __

  • 点转换为单个下划线:._

示例

  • dbms.memory.heap.max_size=8GNEO4J_dbms_memory_heap_max__size: 8G

  • dbms.logs.debug.level=DEBUGNEO4J_dbms_logs_debug_level: DEBUG

以下是一个轻量级的 Docker Compose 文件,允许您在本地环境中测试应用程序。

先决条件

  • Docker

  • Docker Compose

这是有关如何配置 Docker 和 Docker-Compose 的说明。

从包含 compose 文件的同一目录中,您可以启动此命令。

docker-compose up -d

源模块

以下是一个 compose 文件,允许您启动 Neo4j、Kafka 和 Zookeeper 以测试应用程序。

docker-compose.yml
version: '3'
services:
  neo4j:
    image: neo4j:4.4
    hostname: neo4j
    container_name: neo4j
    ports:
      - "7474:7474"
      - "7687:7687"
    depends_on:
      - kafka
    volumes:
      - ./neo4j/plugins:/plugins
    environment:
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_logs_debug_level: DEBUG
      # KAFKA related configuration
      NEO4J_kafka_bootstrap_servers: kafka:19092
      NEO4J_streams_source_topic_nodes_neo4j: Person{*}
      NEO4J_streams_source_topic_relationships_neo4j: KNOWS{*}

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 12181

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092

在本地启动它

先决条件

在启动之前,请根据您的情况更改卷目录,在 <plugins> 目录中,您必须放置 Streams jar 文件。

volumes:
    - ./neo4j/plugins:/plugins

您可以通过执行以下命令来执行一个订阅 neo4j 主题的 Kafka 消费者。

docker exec kafka kafka-console-consumer --bootstrap-server kafka:19092 --topic neo4j --from-beginning

然后直接从 Neo4j 浏览器中,您可以使用此查询生成一些随机数据。

UNWIND range(1,100) as id
CREATE (p:Person {id:id, name: "Name " + id, age: id % 3}) WITH collect(p) as people
UNWIND people as p1
UNWIND range(1,10) as friend
WITH p1, people[(p1.id + friend) % size(people)] as p2
CREATE (p1)-[:KNOWS {years: abs(p2.id - p1.id)}]->(p2)

如果您返回到您的消费者,您将看到类似的内容。

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":98,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"84","before":null,"after":{"properties":{"name":"Name 85","id":85,"age":1},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}
请注意,在本例中,在执行 Kafka 消费者之前没有指定主题名称,它正在监听 neo4j 主题。这是因为 Neo4j Streams 插件(如果未指定)默认情况下会将消息发送到名为 neo4j 的主题。

接收器模块

以下是一个简单的 docker compose 文件,允许您启动两个 Neo4j 实例,一个配置为 发送器,另一个配置为 接收器,允许您从 发送器 共享任何数据到 接收器

  • 发送器http://localhost:8474/browser/(bolt:bolt://localhost:8687)处监听。

  • 接收器http://localhost:7474/browser/(bolt:bolt://localhost:7687)处监听,并配置了 Schema 策略。

    environment:
      NEO4J_streams_sink_enabled: "true"
      NEO4J_streams_sink_topic_neo4j:
        "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"

在本地启动它

在以下示例中,我们将 Neo4j Streams 插件与 APOC 过程结合使用 (从这里下载),以便从 Stackoverflow 下载一些数据,将它们存储到 Neo4j 发送器 实例中,并将这些数据集通过 Neo4j Streams 插件复制到 接收器 中。

version: '3'

services:
  neo4j-source:
    image: neo4j:4.4
    hostname: neo4j-source
    container_name: neo4j-source
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8474:7474"
      - "8687:7687"
    volumes:
      - ./neo4j/plugins:/plugins
    environment:
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/source
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_batch_size: 16384
      NEO4J_streams_sink_enabled: "false"
      NEO4J_streams_source_schema_polling_interval: 10000

  neo4j-sink:
    image: neo4j:4.4
    hostname: neo4j-sink
    container_name: neo4j-sink
    depends_on:
      - neo4j-source
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
      - ./neo4j/plugins-sink:/plugins
    environment:
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/sink
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_kafka_max_poll_records: 16384
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_topic_cdc_schema: "neo4j"
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_streams_sink_enabled: "true"
      NEO4J_streams_sink_topic_neo4j:
        "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
先决条件
  • 将 APOC 安装到 ./neo4j/plugins 中。

  • 将 Neo4j Streams 插件安装到 ./neo4j/plugins./neo4j/plugins-sink 中。

导入数据

让我们到两个实例中,以便在两边创建约束。

// enable the multi-statement execution: https://stackoverflow.com/questions/21778435/multiple-unrelated-queries-in-neo4j-cypher?answertab=votes#tab-top
CREATE CONSTRAINT ON (u:User) ASSERT u.id IS UNIQUE;
CREATE CONSTRAINT ON (a:Answer) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT ON (t:Tag) ASSERT t.name IS UNIQUE;
CREATE CONSTRAINT ON (q:Question) ASSERT q.id IS UNIQUE;

请查看 compose 文件中的属性。

NEO4J_streams_source_schema_polling_interval: 10000

这意味着 Streams 插件每 10 秒轮询一次数据库,以检索模式更改并将其存储起来。因此,在您创建索引后,您需要等待近 10 秒才能进行下一步。

现在,让我们转到 发送器,为了导入 Stackoverflow 数据集,请执行以下查询。

UNWIND range(1, 1) as page
CALL apoc.load.json("https://api.stackexchange.com/2.2/questions?pagesize=100&order=desc&sort=creation&tagged=neo4j&site=stackoverflow&page=" + page) YIELD value
UNWIND value.items AS event
MERGE (question:Question {id:event.question_id}) ON CREATE
  SET question.title = event.title, question.share_link = event.share_link, question.favorite_count = event.favorite_count

FOREACH (ignoreMe in CASE WHEN exists(event.accepted_answer_id) THEN [1] ELSE [] END | MERGE (question)<-[:ANSWERS]-(answer:Answer{id: event.accepted_answer_id}))

WITH * WHERE NOT event.owner.user_id IS NULL
MERGE (owner:User {id:event.owner.user_id}) ON CREATE SET owner.display_name = event.owner.display_name
MERGE (owner)-[:ASKED]->(question)

一旦导入过程完成,为了确保数据已正确复制到 接收器 中,请在 发送器接收器 中执行此查询,并比较结果。

MATCH (n)
RETURN
DISTINCT labels(n),
count(*) AS NumofNodes,
avg(size(keys(n))) AS AvgNumOfPropPerNode,
min(size(keys(n))) AS MinNumPropPerNode,
max(size(keys(n))) AS MaxNumPropPerNode,
avg(size((n)-[]-())) AS AvgNumOfRelationships,
min(size((n)-[]-())) AS MinNumOfRelationships,
max(size((n)-[]-())) AS MaxNumOfRelationships
order by NumofNodes desc

您还可以通过执行以下命令来启动一个订阅 neo4j 主题的 Kafka 消费者。

docker exec broker kafka-console-consumer --bootstrap-server broker:9093 --topic neo4j --from-beginning

您将看到类似的内容。

{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":330,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"94","start":{"id":"186","labels":["User"],"ids":{"id":286795}},"end":{"id":"59","labels":["Question"],"ids":{"id":58303891}},"before":null,"after":{"properties":{}},"label":"ASKED","type":"relationship"},"schema":{"properties":{},"constraints":[]}}

{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":331,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"34","start":{"id":"134","labels":["Answer"],"ids":{"id":58180296}},"end":{"id":"99","labels":["Question"],"ids":{"id":58169215}},"before":null,"after":{"properties":{}},"label":"ANSWERS","type":"relationship"},"schema":{"properties":{},"constraints":[]}}

Neo4j Streams 与 Neo4j 集群和 Kafka 集群

这里,我们提供一个 docker-compose 文件,用于快速启动一个由 3 个节点的 Neo4j 因果集群(在接收器模式下配置了 Streams 插件)和 3 个节点的 Kafka 集群组成的环境。

version: '3'

networks:
  kafka_cluster:
    driver: bridge

services:

  core1:
    image: neo4j:4.4-enterprise
    hostname: core1
    container_name: core1
    ports:
      - 7474:7474
      - 6477:6477
      - 7687:7687
    volumes:
      - ./neo4j-cluster-40/core1/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: CORE
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7474
      NEO4J_dbms_connector_https_listen__address: :6477
      NEO4J_dbms_connector_bolt_listen__address: :7687
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_apoc_import_file_enabled: "true"
      NEO4J_kafka_auto_offset_reset: "latest"
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  core2:
    image: neo4j:4.0.3-enterprise
    hostname: core2
    container_name: core2
    ports:
      - 7475:7475
      - 6478:6478
      - 7688:7688
    volumes:
      - ./neo4j-cluster-40/core2/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: CORE
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7475
      NEO4J_dbms_connector_https_listen__address: :6478
      NEO4J_dbms_connector_bolt_listen__address: :7688
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_apoc_import_file_enabled: "true"
      NEO4J_kafka_auto_offset_reset: "latest"
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  core3:
    image: neo4j:4.0.3-enterprise
    hostname: core3
    container_name: core3
    ports:
      - 7476:7476
      - 6479:6479
      - 7689:7689
    volumes:
      - ./neo4j-cluster-40/core3/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: CORE
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7476
      NEO4J_dbms_connector_https_listen__address: :6479
      NEO4J_dbms_connector_bolt_listen__address: :7689
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  read1:
    image: neo4j:4.0.3-enterprise
    hostname: read1
    container_name: read1
    ports:
      - 7477:7477
      - 6480:6480
      - 7690:7690
    volumes:
      - ./neo4j-cluster-40/read1/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: READ_REPLICA
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7477
      NEO4J_dbms_connector_https_listen__address: :6480
      NEO4J_dbms_connector_bolt_listen__address: :7690
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  zookeeper-1:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper-1
    container_name: zookeeper-1
    ports:
      - 22181:22181
      - 22888:22888
      - 23888:23888
    volumes:
      - ./zookeeper-1/data:/data
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
    networks:
      - kafka_cluster

  zookeeper-2:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper-2
    container_name: zookeeper-2
    ports:
      - 32181:32181
      - 32888:32888
      - 33888:33888
    volumes:
      - ./zookeeper-2/data:/data
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
    networks:
      - kafka_cluster

  zookeeper-3:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper-3
    container_name: zookeeper-3
    ports:
      - 42181:42181
      - 42888:42888
      - 43888:43888
    volumes:
      - ./zookeeper-3/data:/data
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 42181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
    networks:
      - kafka_cluster

  broker-1:
    image: confluentinc/cp-kafka
    hostname: broker-1
    container_name: broker-1
    ports:
      - 9092:9092
      - 29092:29092
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-1:29092,PLAINTEXT_HOST://localhost:9092
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTO_OFFSET_RESET: "latest"
      KAFKA_MAX_POLL_INTERVAL_MS: 300000
      KAFKA_MAX_POLL_RECORDS: 20000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_MESSAGE_MAX_BYTES: 20220088
    networks:
      - kafka_cluster

  broker-2:
    image: confluentinc/cp-kafka
    hostname: broker-2
    container_name: broker-2
    ports:
      - 9093:9093
      - 39092:39092
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:39092,PLAINTEXT_HOST://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-2:39092,PLAINTEXT_HOST://localhost:9093
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTO_OFFSET_RESET: "latest"
      KAFKA_MAX_POLL_INTERVAL_MS: 300000
      KAFKA_MAX_POLL_RECORDS: 20000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_MESSAGE_MAX_BYTES: 20220088
    networks:
      - kafka_cluster

  broker-3:
    image: confluentinc/cp-kafka
    hostname: broker-3
    container_name: broker-3
    ports:
      - 9094:9094
      - 49092:49092
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:49092,PLAINTEXT_HOST://:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-3:49092,PLAINTEXT_HOST://localhost:9094
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTO_OFFSET_RESET: "latest"
      KAFKA_MAX_POLL_INTERVAL_MS: 300000
      KAFKA_MAX_POLL_RECORDS: 20000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_MESSAGE_MAX_BYTES: 20220088
    networks:
      - kafka_cluster

您需要做的就是

  • 从这里下载最新的 Neo4j Streams 插件版本:https://github.com/neo4j-contrib/neo4j-streams/releases/tag/4.1.5

  • 确保创建卷文件夹(在与 docker-compose 文件相同的文件夹中)/neo4j-cluster-40/core1/plugins/neo4j-cluster-40/core2/plugins/neo4j-cluster-40/core3/plugins/neo4j-cluster-40/read1/plugins,并确保将 neo4j-streams-4.1.5.jar 放置到这些文件夹中。

  • 运行 docker-compose up -d

  • 从网络浏览器连接到 Neo4j core1 实例:localhost:7474

    • 使用 docker-compose 文件中提供的凭据登录。

    • 创建一个新的数据库(Neo4j Streams 接收器监听的数据库),从 Neo4j 浏览器中运行以下 2 个命令。

      • :use system

      • CREATE DATABASE dbtest

  • 一旦所有容器都启动并运行,打开一个终端窗口,并连接到 Kafka broker-1,以便使用 kafka-console-producer 发送一个 JSON 事件。请按照以下步骤操作。

    • docker exec -it broker-1 /bin/bash

    • kafka-console-producer --broker-list broker-1:29092 --topic mytopic

    • 将以下 JSON 事件粘贴到 kafka-console-producer 中。

      {"id": 1, "name": "Mauro", "surname": "Roiter"}.

以下是最后几个步骤的输出示例。

$ docker exec -it broker-1 /bin/bash
root@broker-1:/# kafka-console-producer --broker-list broker-1:29092 --topic mytopic
>{"id": 1, "name": "Mauro", "surname": "Roiter"}

现在,如果您返回到 Neo4j 浏览器,您将在相应的数据库 dbtest 中看到已创建的节点。

docker streams cluster example
图 1. Neo4j+Kafka 集群环境中的 Streams 接收器插件

您还将在其他 Neo4j 实例中看到相同的结果。

在本例中,我们使用了 Neo4j Enterprise docker 映像,因为“CREATE DATABASE”功能仅在 Enterprise Edition 中可用。