使用 Docker 运行
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发中,并且在 Neo4j 4.4 版之后将不再支持。 最新的 Kafka Connect Neo4j 连接器版本可以在这里找到 这里. |
Neo4j Streams 插件
介绍
当 Neo4j 在 Docker 中运行时,需要考虑一些特殊情况;有关详细信息,请参阅 Neo4j Docker 配置。特别是,在 neo4j.conf
中使用的配置格式看起来有所不同。
请注意,Neo4j Docker 映像使用命名约定;您可以通过在每个 neo4j.conf
属性之前添加 NEO4J_
并使用以下转换来覆盖它。
-
单个下划线转换为双下划线:
_ → __
-
点转换为单个下划线:
.
→_
示例
-
dbms.memory.heap.max_size=8G
→NEO4J_dbms_memory_heap_max__size: 8G
-
dbms.logs.debug.level=DEBUG
→NEO4J_dbms_logs_debug_level: DEBUG
有关更多信息和示例,请参阅本节和文档的 Confluent 与 Docker 部分。
另一个需要注意的重要事项是可能存在的权限问题。如果您想在 Docker 中使用主机卷运行 Kafka,而该卷的拥有者不是用户,那么您将遇到权限错误。有两种可能的解决方案。
|
Neo4j docker 容器基于一种方法构建,该方法使用传递给容器的环境变量作为配置 Neo4j 的方式。环境变量不能包含某些字符,特别是连字符 - 字符。将插件配置为使用包含这些字符的流名称将无法正常工作,因为诸如 NEO4J_streams_sink_topic_cypher_my-topic 之类的配置环境变量无法正确评估为环境变量 (my-topic )。这是 Neo4j docker 容器的限制,而不是 neo4j-streams。 |
请注意,Neo4j Docker 映像使用命名约定;您可以通过在每个 neo4j.conf
属性之前添加 NEO4J_
并使用以下转换来覆盖它。
-
单个下划线转换为双下划线:
_ → __
-
点转换为单个下划线:
.
→_
示例
-
dbms.memory.heap.max_size=8G
→NEO4J_dbms_memory_heap_max__size: 8G
-
dbms.logs.debug.level=DEBUG
→NEO4J_dbms_logs_debug_level: DEBUG
以下是一个轻量级的 Docker Compose 文件,允许您在本地环境中测试应用程序。
先决条件
-
Docker
-
Docker Compose
这是有关如何配置 Docker 和 Docker-Compose 的说明。
从包含 compose 文件的同一目录中,您可以启动此命令。
docker-compose up -d
源模块
以下是一个 compose 文件,允许您启动 Neo4j、Kafka 和 Zookeeper 以测试应用程序。
version: '3'
services:
neo4j:
image: neo4j:4.4
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
depends_on:
- kafka
volumes:
- ./neo4j/plugins:/plugins
environment:
NEO4J_AUTH: neo4j/streams
NEO4J_dbms_logs_debug_level: DEBUG
# KAFKA related configuration
NEO4J_kafka_bootstrap_servers: kafka:19092
NEO4J_streams_source_topic_nodes_neo4j: Person{*}
NEO4J_streams_source_topic_relationships_neo4j: KNOWS{*}
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "12181:12181"
environment:
ZOOKEEPER_CLIENT_PORT: 12181
kafka:
image: confluentinc/cp-kafka:latest
hostname: kafka
container_name: kafka
ports:
- "19092:19092"
depends_on:
- zookeeper
environment:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:12181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092
在本地启动它
先决条件
-
将 最新版本的 Neo4j Streams 插件 安装到
./neo4j/plugins
中。
在启动之前,请根据您的情况更改卷目录,在 <plugins> 目录中,您必须放置 Streams jar 文件。
volumes:
- ./neo4j/plugins:/plugins
您可以通过执行以下命令来执行一个订阅 neo4j
主题的 Kafka 消费者。
docker exec kafka kafka-console-consumer --bootstrap-server kafka:19092 --topic neo4j --from-beginning
然后直接从 Neo4j 浏览器中,您可以使用此查询生成一些随机数据。
UNWIND range(1,100) as id
CREATE (p:Person {id:id, name: "Name " + id, age: id % 3}) WITH collect(p) as people
UNWIND people as p1
UNWIND range(1,10) as friend
WITH p1, people[(p1.id + friend) % size(people)] as p2
CREATE (p1)-[:KNOWS {years: abs(p2.id - p1.id)}]->(p2)
如果您返回到您的消费者,您将看到类似的内容。
{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":98,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"84","before":null,"after":{"properties":{"name":"Name 85","id":85,"age":1},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}
{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}
{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}
请注意,在本例中,在执行 Kafka 消费者之前没有指定主题名称,它正在监听 neo4j 主题。这是因为 Neo4j Streams 插件(如果未指定)默认情况下会将消息发送到名为 neo4j 的主题。 |
接收器模块
以下是一个简单的 docker compose 文件,允许您启动两个 Neo4j 实例,一个配置为 发送器
,另一个配置为 接收器
,允许您从 发送器
共享任何数据到 接收器
。
-
发送器
在http://localhost:8474/browser/
(bolt:bolt://localhost:8687
)处监听。 -
接收器
在http://localhost:7474/browser/
(bolt:bolt://localhost:7687
)处监听,并配置了Schema
策略。
environment:
NEO4J_streams_sink_enabled: "true"
NEO4J_streams_sink_topic_neo4j:
"WITH event.value.payload AS payload, event.value.meta AS meta
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
MERGE (e:Question{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
MERGE (s:Question{neo_id: toInteger(payload.start.id)})
MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
MERGE (s:User{neo_id: toInteger(payload.start.id)})
MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
MERGE (s:User{neo_id: toInteger(payload.start.id)})
MERGE (e:Question{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
)"
在本地启动它
在以下示例中,我们将 Neo4j Streams 插件与 APOC 过程结合使用 (从这里下载),以便从 Stackoverflow 下载一些数据,将它们存储到 Neo4j 发送器
实例中,并将这些数据集通过 Neo4j Streams 插件复制到 接收器
中。
version: '3'
services:
neo4j-source:
image: neo4j:4.4
hostname: neo4j-source
container_name: neo4j-source
depends_on:
- zookeeper
- broker
ports:
- "8474:7474"
- "8687:7687"
volumes:
- ./neo4j/plugins:/plugins
environment:
NEO4J_kafka_bootstrap_servers: broker:9093
NEO4J_AUTH: neo4j/source
NEO4J_dbms_memory_heap_max__size: 2G
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_kafka_batch_size: 16384
NEO4J_streams_sink_enabled: "false"
NEO4J_streams_source_schema_polling_interval: 10000
neo4j-sink:
image: neo4j:4.4
hostname: neo4j-sink
container_name: neo4j-sink
depends_on:
- neo4j-source
ports:
- "7474:7474"
- "7687:7687"
volumes:
- ./neo4j/plugins-sink:/plugins
environment:
NEO4J_kafka_bootstrap_servers: broker:9093
NEO4J_AUTH: neo4j/sink
NEO4J_dbms_memory_heap_max__size: 2G
NEO4J_kafka_max_poll_records: 16384
NEO4J_streams_source_enabled: "false"
NEO4J_streams_sink_topic_cdc_schema: "neo4j"
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_streams_sink_enabled: "true"
NEO4J_streams_sink_topic_neo4j:
"WITH event.value.payload AS payload, event.value.meta AS meta
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
MERGE (e:Question{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
MERGE (s:Question{neo_id: toInteger(payload.start.id)})
MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
MERGE (s:User{neo_id: toInteger(payload.start.id)})
MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
MERGE (s:User{neo_id: toInteger(payload.start.id)})
MERGE (e:Question{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
)"
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema_registry:
image: confluentinc/cp-schema-registry
hostname: schema_registry
container_name: schema_registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
导入数据
让我们到两个实例中,以便在两边创建约束。
// enable the multi-statement execution: https://stackoverflow.com/questions/21778435/multiple-unrelated-queries-in-neo4j-cypher?answertab=votes#tab-top
CREATE CONSTRAINT ON (u:User) ASSERT u.id IS UNIQUE;
CREATE CONSTRAINT ON (a:Answer) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT ON (t:Tag) ASSERT t.name IS UNIQUE;
CREATE CONSTRAINT ON (q:Question) ASSERT q.id IS UNIQUE;
请查看 compose 文件中的属性。
NEO4J_streams_source_schema_polling_interval: 10000
这意味着 Streams 插件每 10 秒轮询一次数据库,以检索模式更改并将其存储起来。因此,在您创建索引后,您需要等待近 10 秒才能进行下一步。
现在,让我们转到 发送器
,为了导入 Stackoverflow 数据集,请执行以下查询。
UNWIND range(1, 1) as page
CALL apoc.load.json("https://api.stackexchange.com/2.2/questions?pagesize=100&order=desc&sort=creation&tagged=neo4j&site=stackoverflow&page=" + page) YIELD value
UNWIND value.items AS event
MERGE (question:Question {id:event.question_id}) ON CREATE
SET question.title = event.title, question.share_link = event.share_link, question.favorite_count = event.favorite_count
FOREACH (ignoreMe in CASE WHEN exists(event.accepted_answer_id) THEN [1] ELSE [] END | MERGE (question)<-[:ANSWERS]-(answer:Answer{id: event.accepted_answer_id}))
WITH * WHERE NOT event.owner.user_id IS NULL
MERGE (owner:User {id:event.owner.user_id}) ON CREATE SET owner.display_name = event.owner.display_name
MERGE (owner)-[:ASKED]->(question)
一旦导入过程完成,为了确保数据已正确复制到 接收器
中,请在 发送器
和 接收器
中执行此查询,并比较结果。
MATCH (n)
RETURN
DISTINCT labels(n),
count(*) AS NumofNodes,
avg(size(keys(n))) AS AvgNumOfPropPerNode,
min(size(keys(n))) AS MinNumPropPerNode,
max(size(keys(n))) AS MaxNumPropPerNode,
avg(size((n)-[]-())) AS AvgNumOfRelationships,
min(size((n)-[]-())) AS MinNumOfRelationships,
max(size((n)-[]-())) AS MaxNumOfRelationships
order by NumofNodes desc
您还可以通过执行以下命令来启动一个订阅 neo4j
主题的 Kafka 消费者。
docker exec broker kafka-console-consumer --bootstrap-server broker:9093 --topic neo4j --from-beginning
您将看到类似的内容。
{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":330,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"94","start":{"id":"186","labels":["User"],"ids":{"id":286795}},"end":{"id":"59","labels":["Question"],"ids":{"id":58303891}},"before":null,"after":{"properties":{}},"label":"ASKED","type":"relationship"},"schema":{"properties":{},"constraints":[]}}
{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":331,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"34","start":{"id":"134","labels":["Answer"],"ids":{"id":58180296}},"end":{"id":"99","labels":["Question"],"ids":{"id":58169215}},"before":null,"after":{"properties":{}},"label":"ANSWERS","type":"relationship"},"schema":{"properties":{},"constraints":[]}}
Neo4j Streams 与 Neo4j 集群和 Kafka 集群
这里,我们提供一个 docker-compose 文件,用于快速启动一个由 3 个节点的 Neo4j 因果集群(在接收器模式下配置了 Streams 插件)和 3 个节点的 Kafka 集群组成的环境。
version: '3'
networks:
kafka_cluster:
driver: bridge
services:
core1:
image: neo4j:4.4-enterprise
hostname: core1
container_name: core1
ports:
- 7474:7474
- 6477:6477
- 7687:7687
volumes:
- ./neo4j-cluster-40/core1/plugins:/plugins
networks:
- kafka_cluster
environment:
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_AUTH: neo4j/streams
NEO4J_dbms_mode: CORE
NEO4J_causalClustering_expectedCoreClusterSize: 3
NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
NEO4J_dbms_connector_http_listen__address: :7474
NEO4J_dbms_connector_https_listen__address: :6477
NEO4J_dbms_connector_bolt_listen__address: :7687
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_apoc_import_file_enabled: "true"
NEO4J_kafka_auto_offset_reset: "latest"
NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
NEO4J_kafka_group_id: "neo4j"
NEO4J_kafka_client_id: "neo4j"
NEO4J_kafka_enable_auto_commit: "false"
NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_streams_source_enabled: "false"
NEO4J_streams_sink_enabled_to_dbtest: "true"
NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"
core2:
image: neo4j:4.0.3-enterprise
hostname: core2
container_name: core2
ports:
- 7475:7475
- 6478:6478
- 7688:7688
volumes:
- ./neo4j-cluster-40/core2/plugins:/plugins
networks:
- kafka_cluster
environment:
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_AUTH: neo4j/streams
NEO4J_dbms_mode: CORE
NEO4J_causalClustering_expectedCoreClusterSize: 3
NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
NEO4J_dbms_connector_http_listen__address: :7475
NEO4J_dbms_connector_https_listen__address: :6478
NEO4J_dbms_connector_bolt_listen__address: :7688
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_apoc_import_file_enabled: "true"
NEO4J_kafka_auto_offset_reset: "latest"
NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
NEO4J_kafka_group_id: "neo4j"
NEO4J_kafka_client_id: "neo4j"
NEO4J_kafka_enable_auto_commit: "false"
NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_streams_source_enabled: "false"
NEO4J_streams_sink_enabled_to_dbtest: "true"
NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"
core3:
image: neo4j:4.0.3-enterprise
hostname: core3
container_name: core3
ports:
- 7476:7476
- 6479:6479
- 7689:7689
volumes:
- ./neo4j-cluster-40/core3/plugins:/plugins
networks:
- kafka_cluster
environment:
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_AUTH: neo4j/streams
NEO4J_dbms_mode: CORE
NEO4J_causalClustering_expectedCoreClusterSize: 3
NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
NEO4J_dbms_connector_http_listen__address: :7476
NEO4J_dbms_connector_https_listen__address: :6479
NEO4J_dbms_connector_bolt_listen__address: :7689
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
NEO4J_kafka_group_id: "neo4j"
NEO4J_kafka_client_id: "neo4j"
NEO4J_kafka_enable_auto_commit: "false"
NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_streams_source_enabled: "false"
NEO4J_streams_sink_enabled_to_dbtest: "true"
NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"
read1:
image: neo4j:4.0.3-enterprise
hostname: read1
container_name: read1
ports:
- 7477:7477
- 6480:6480
- 7690:7690
volumes:
- ./neo4j-cluster-40/read1/plugins:/plugins
networks:
- kafka_cluster
environment:
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_AUTH: neo4j/streams
NEO4J_dbms_mode: READ_REPLICA
NEO4J_causalClustering_expectedCoreClusterSize: 3
NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
NEO4J_dbms_connector_http_listen__address: :7477
NEO4J_dbms_connector_https_listen__address: :6480
NEO4J_dbms_connector_bolt_listen__address: :7690
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
NEO4J_kafka_group_id: "neo4j"
NEO4J_kafka_client_id: "neo4j"
NEO4J_kafka_enable_auto_commit: "false"
NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
NEO4J_streams_source_enabled: "false"
NEO4J_streams_sink_enabled_to_dbtest: "true"
NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"
zookeeper-1:
image: confluentinc/cp-zookeeper
hostname: zookeeper-1
container_name: zookeeper-1
ports:
- 22181:22181
- 22888:22888
- 23888:23888
volumes:
- ./zookeeper-1/data:/data
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 22181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
networks:
- kafka_cluster
zookeeper-2:
image: confluentinc/cp-zookeeper
hostname: zookeeper-2
container_name: zookeeper-2
ports:
- 32181:32181
- 32888:32888
- 33888:33888
volumes:
- ./zookeeper-2/data:/data
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
networks:
- kafka_cluster
zookeeper-3:
image: confluentinc/cp-zookeeper
hostname: zookeeper-3
container_name: zookeeper-3
ports:
- 42181:42181
- 42888:42888
- 43888:43888
volumes:
- ./zookeeper-3/data:/data
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 42181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
networks:
- kafka_cluster
broker-1:
image: confluentinc/cp-kafka
hostname: broker-1
container_name: broker-1
ports:
- 9092:9092
- 29092:29092
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-1:29092,PLAINTEXT_HOST://localhost:9092
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_AUTO_OFFSET_RESET: "latest"
KAFKA_MAX_POLL_INTERVAL_MS: 300000
KAFKA_MAX_POLL_RECORDS: 20000
KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
KAFKA_NUM_PARTITIONS: 2
KAFKA_MESSAGE_MAX_BYTES: 20220088
networks:
- kafka_cluster
broker-2:
image: confluentinc/cp-kafka
hostname: broker-2
container_name: broker-2
ports:
- 9093:9093
- 39092:39092
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:39092,PLAINTEXT_HOST://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-2:39092,PLAINTEXT_HOST://localhost:9093
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_AUTO_OFFSET_RESET: "latest"
KAFKA_MAX_POLL_INTERVAL_MS: 300000
KAFKA_MAX_POLL_RECORDS: 20000
KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
KAFKA_NUM_PARTITIONS: 2
KAFKA_MESSAGE_MAX_BYTES: 20220088
networks:
- kafka_cluster
broker-3:
image: confluentinc/cp-kafka
hostname: broker-3
container_name: broker-3
ports:
- 9094:9094
- 49092:49092
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:49092,PLAINTEXT_HOST://:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-3:49092,PLAINTEXT_HOST://localhost:9094
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_AUTO_OFFSET_RESET: "latest"
KAFKA_MAX_POLL_INTERVAL_MS: 300000
KAFKA_MAX_POLL_RECORDS: 20000
KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
KAFKA_NUM_PARTITIONS: 2
KAFKA_MESSAGE_MAX_BYTES: 20220088
networks:
- kafka_cluster
您需要做的就是
-
从这里下载最新的 Neo4j Streams 插件版本:https://github.com/neo4j-contrib/neo4j-streams/releases/tag/4.1.5
-
确保创建卷文件夹(在与 docker-compose 文件相同的文件夹中)
/neo4j-cluster-40/core1/plugins
、/neo4j-cluster-40/core2/plugins
、/neo4j-cluster-40/core3/plugins
、/neo4j-cluster-40/read1/plugins
,并确保将neo4j-streams-4.1.5.jar
放置到这些文件夹中。 -
运行
docker-compose up -d
。 -
从网络浏览器连接到 Neo4j
core1
实例:localhost:7474
。-
使用 docker-compose 文件中提供的凭据登录。
-
创建一个新的数据库(Neo4j Streams 接收器监听的数据库),从 Neo4j 浏览器中运行以下 2 个命令。
-
:use system
-
CREATE DATABASE dbtest
-
-
-
一旦所有容器都启动并运行,打开一个终端窗口,并连接到 Kafka
broker-1
,以便使用kafka-console-producer
发送一个 JSON 事件。请按照以下步骤操作。-
docker exec -it broker-1 /bin/bash
-
kafka-console-producer --broker-list broker-1:29092 --topic mytopic
-
将以下 JSON 事件粘贴到 kafka-console-producer 中。
{"id": 1, "name": "Mauro", "surname": "Roiter"}
.
-
以下是最后几个步骤的输出示例。
$ docker exec -it broker-1 /bin/bash
root@broker-1:/# kafka-console-producer --broker-list broker-1:29092 --topic mytopic
>{"id": 1, "name": "Mauro", "surname": "Roiter"}
现在,如果您返回到 Neo4j 浏览器,您将在相应的数据库 dbtest
中看到已创建的节点。

您还将在其他 Neo4j 实例中看到相同的结果。
在本例中,我们使用了 Neo4j Enterprise docker 映像,因为“CREATE DATABASE”功能仅在 Enterprise Edition 中可用。 |