使用 Confluent 和 Kafka Connect Datagen 的示例

Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版之后将不再受支持。

Kafka Connect Neo4j 连接器的最新版本可以在此处找到。

二进制格式的 Confluent 和 Neo4j

在此示例中,Neo4j 和 Confluent 将以二进制格式下载,并且 Neo4j Streams 插件将在接收器模式下设置。Neo4j 使用的数据将由**Kafka Connect Datagen**生成。请注意,此连接器仅应用于测试目的,不适用于生产环境。

下载并安装 Confluent 平台

  • 下载Confluent 平台,然后选择所需的格式.tar.gz.zip

  • 在您想要的文件夹中解压缩文件

  • 将 Confluent 的bin目录的安装位置添加到您的 PATH 环境变量中。

export PATH=<CONFLUENT_HOME_DIR>/bin:$PATH
  • 使用以下命令运行 Confluent 平台

confluent local start

输出应类似于以下内容

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

下载 Neo4j

  • 在以下链接下载最新版本的 Neo4j https://neo4j.ac.cn/download-center/

  • 在您想要的文件夹中解压缩它

  • 通过将 jar 复制到 plugins 文件夹中来安装 Neo4j Streams 插件

  • 为了启用接收器功能,请将以下属性添加到neo4j.conf

neo4j.conf
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

#********************************************************************
# Kafka Consumer
#********************************************************************
streams.sink.enabled=true
streams.sink.topic.cypher.pageviews=MERGE (n:User {id: event.payload.userid}) MERGE (p:PageView { id: event.payload.pageid }) MERGE (n)-[:VIEWED]->(p)

根据选择的数据格式相应地配置反序列化器

  • 对于**JSON**格式,使用org.apache.kafka.common.serialization.ByteArrayDeserializer

  • 对于**AVRO**格式,使用io.confluent.kafka.serializers.KafkaAvroDeserializer

如果是 AVRO,则还需要架构注册表配置

kafka.schema.registry.url=localhost:8081

其中 8081 是 Confluent Schema Registry 的默认端口。

如果您在添加上述属性之前启动了 Neo4j,则还需要重新启动 Neo4j 服务器。

安装 Kafka Connect Datagen

使用 Confluent Hub 客户端安装Kafka Connect Datagen

<CONFLUENT_HOME_DIR>/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

输出应类似于以下内容

Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache License 2.0
https://apache.ac.cn/licenses/LICENSE-2.0
Downloading component Kafka Connect Datagen 0.1.5, provided by Confluent, Inc. from Confluent Hub and installing into /Applications/Development/confluent-5.3.1/share/confluent-hub-components
...
Completed

查看结果

现在您可以访问 http://localhost:9021 上的 Confluent Control Center,您可以创建 Kafka 主题并生成一些示例数据。请按照官方Confluent 文档步骤 2步骤 3操作。

在配置数据生成器连接器时,还请使用以下值指定Value converter class属性

org.apache.kafka.connect.json.JsonConverter

访问 http://localhost:7474 上的 Neo4j 浏览器,您可以看到由**Kafka Connect Datagen**生成的 Kafka 消息已根据属性streams.sink.topic.cypher.pageviews中指定的 Cypher 转换为节点和关系。只需执行以下 Cypher 查询

MATCH p=()-->() RETURN p LIMIT 25

输出应类似于以下内容

sink ouput
图 1. 接收器输出

使用 Docker 的 Confluent,二进制格式的 Neo4j

在此示例中,Neo4j 将在本地安装,而 Confluent 平台将在 Docker 环境中。

Neo4j

Neo4j 的安装和配置方式与上述示例相同。

使用 Docker 的 Confluent

为了使用 Docker 获得一个现成的 Confluent 平台,请使用以下 docker-compose 文件(**请注意,在connect服务的配置中,您必须替换要安装的 kafka-connect-plugin 的<version>**)

docker-compose.yml
version: '2'
services:

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/kafka-connect-datagen:latest
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema_registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:<version> && \
        confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
        /etc/confluent/docker/run

  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema_registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

您必须分配至少 8 GB 的 Docker 内存资源,以避免连接容器出现**退出代码 137(内存不足错误)**。

docker memory setting

要查看结果,请按照上面查看结果部分中说明的说明操作。