使用 Confluent 和 Kafka Connect Datagen 的示例
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版之后将不再受支持。 Kafka Connect Neo4j 连接器的最新版本可以在此处找到。 |
二进制格式的 Confluent 和 Neo4j
在此示例中,Neo4j 和 Confluent 将以二进制格式下载,并且 Neo4j Streams 插件将在接收器模式下设置。Neo4j 使用的数据将由**Kafka Connect Datagen**生成。请注意,此连接器仅应用于测试目的,不适用于生产环境。
下载并安装 Confluent 平台
-
下载Confluent 平台,然后选择所需的格式
.tar.gz
或.zip
。 -
在您想要的文件夹中解压缩文件
-
将 Confluent 的
bin
目录的安装位置添加到您的 PATH 环境变量中。
export PATH=<CONFLUENT_HOME_DIR>/bin:$PATH
-
使用以下命令运行 Confluent 平台
confluent local start
输出应类似于以下内容
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
下载 Neo4j
-
在以下链接下载最新版本的 Neo4j https://neo4j.ac.cn/download-center/
-
在您想要的文件夹中解压缩它
-
通过将 jar 复制到 plugins 文件夹中来安装 Neo4j Streams 插件
-
为了启用接收器功能,请将以下属性添加到
neo4j.conf
中
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
#********************************************************************
# Kafka Consumer
#********************************************************************
streams.sink.enabled=true
streams.sink.topic.cypher.pageviews=MERGE (n:User {id: event.payload.userid}) MERGE (p:PageView { id: event.payload.pageid }) MERGE (n)-[:VIEWED]->(p)
根据选择的数据格式相应地配置反序列化器
-
对于**JSON**格式,使用org.apache.kafka.common.serialization.ByteArrayDeserializer
-
对于**AVRO**格式,使用io.confluent.kafka.serializers.KafkaAvroDeserializer
如果是 AVRO,则还需要架构注册表配置
kafka.schema.registry.url=localhost:8081
其中 8081 是 Confluent Schema Registry 的默认端口。
如果您在添加上述属性之前启动了 Neo4j,则还需要重新启动 Neo4j 服务器。 |
安装 Kafka Connect Datagen
使用 Confluent Hub 客户端安装Kafka Connect Datagen。
<CONFLUENT_HOME_DIR>/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
输出应类似于以下内容
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache License 2.0
https://apache.ac.cn/licenses/LICENSE-2.0
Downloading component Kafka Connect Datagen 0.1.5, provided by Confluent, Inc. from Confluent Hub and installing into /Applications/Development/confluent-5.3.1/share/confluent-hub-components
...
Completed
查看结果
现在您可以访问 http://localhost:9021 上的 Confluent Control Center,您可以创建 Kafka 主题并生成一些示例数据。请按照官方Confluent 文档的步骤 2和步骤 3操作。
在配置数据生成器连接器时,还请使用以下值指定Value converter class
属性
org.apache.kafka.connect.json.JsonConverter
访问 http://localhost:7474 上的 Neo4j 浏览器,您可以看到由**Kafka Connect Datagen**生成的 Kafka 消息已根据属性streams.sink.topic.cypher.pageviews
中指定的 Cypher 转换为节点和关系。只需执行以下 Cypher 查询
MATCH p=()-->() RETURN p LIMIT 25
输出应类似于以下内容

使用 Docker 的 Confluent,二进制格式的 Neo4j
在此示例中,Neo4j 将在本地安装,而 Confluent 平台将在 Docker 环境中。
使用 Docker 的 Confluent
为了使用 Docker 获得一个现成的 Confluent 平台,请使用以下 docker-compose 文件(**请注意,在connect
服务的配置中,您必须替换要安装的 kafka-connect-plugin 的<version>
**)
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
# workaround if we change to a custom name the schema_registry fails to start
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema_registry:
image: confluentinc/cp-schema-registry
hostname: schema_registry
container_name: schema_registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
connect:
image: confluentinc/kafka-connect-datagen:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
command:
- bash
- -c
- |
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:<version> && \
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
/etc/confluent/docker/run
control-center:
image: confluentinc/cp-enterprise-control-center
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema_registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
您必须分配至少 8 GB 的 Docker 内存资源,以避免连接容器出现**退出代码 137(内存不足错误)**。 ![]() |
要查看结果,请按照上面查看结果部分中说明的说明操作。