使用 SSL 配置 Kafka
Kafka Connect Neo4j 连接器是将 Kafka 与 Neo4j 集成的推荐方法,因为 Neo4j Streams 不再处于积极开发阶段,并且在 Neo4j 4.4 版之后将不再受支持。 Kafka Connect Neo4j 连接器的最新版本可以在这里找到 这里。 |
本节提供有关配置 Kafka 和 Neo4j 之间的 SSL 安全性的指南。这将提供 Kafka 和 Neo4j 之间的数据加密。
这不会解决 KAFKA 内部的 ACL 配置。
有关如何使用 SSL 配置加密和身份验证的更多详细信息,请参阅以下 Confluent 文档
自签名证书
确保您为每个服务器都拥有 truststore 和 keystore JKS。如果您想要自签名证书,可以使用以下命令
mkdir security
cd security
export PASSWORD=password
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey
keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed
创建密钥库后,您必须将 kafka.client1.keystore.jks
和 kafka.client1.truststore.jks
移动到您的 neo4j 服务器。
本文讨论了解决此错误(由:java.security.cert.CertificateException: No subject alternative names present 引起)的问题,此错误可能在查询主题时出现。 https://geekflare.com/san-ssl-certificate/ |
Kafka 配置
连接到您的 Kafka 服务器并修改 config/server.properties
文件。此配置通常有效,但其他没有 EXTERNAL 和 INTERNAL 设置的配置也应该有效。此配置适用于 AWS 上的 Kafka,但应该适用于其他配置。
listeners=EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:19092,CLIENT://0.0.0.0:9093,SSL://0.0.0.0:9094
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,SSL:SSL
advertised.listeners=EXTERNAL://aws_public_ip:9092,INTERNAL://aws_internal_ip:19092,CLIENT://aws_public_ip:9093,SSL://aws_public_ip:9094
inter.broker.listener.name=INTERNAL
ssl.keystore.location=/home/kafka/security/kafka.server.keystore.jks
ssl.keystore.password=neo4jpassword
ssl.truststore.location=/home/kafka/security/kafka.server.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.key.password=neo4jpassword
ssl.enabled.protocols=TLSv1.2,TLSv1.1
ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required
Neo4j 配置
Neo4j 配置需要以下内容。在这种情况下,我们连接到公共 AWS IP 地址。密钥库和信任库位置指向您之前在步骤中创建的文件。
请注意,密码以明文形式存储,因此请限制对 neo4j.conf
文件的访问。
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9094
streams.sink.enabled=false
streams.source.topic.nodes.neoTest=Person{*}
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
streams.sink.dlq=neo4j-dlq
kafka.acks=all
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.security.protocol=SSL
kafka.ssl.truststore.location=/home/ubuntu/security/kafka.client1.truststore.jks
kafka.ssl.truststore.password=neo4jpassword
kafka.ssl.keystore.location=/home/ubuntu/security/kafka.client1.keystore.jks
kafka.ssl.keystore.password=neo4jpassword
kafka.ssl.key.password=neo4jpassword
kafka.ssl.endpoint.identification.algorithm=HTTPS
dbms.security.procedures.whitelist=apoc.*
dbms.security.procedures.unrestricted=apoc.*
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake
此行 dbms.jvm.additional=-Djavax.net.debug=ssl:handshake
是可选的,但确实有助于调试 SSL 问题。
在配置 Neo4j 和 Kafka 之间的安全连接时,特别是使用 SASL 协议时,请注意使用以下属性
并且 **不要** 使用以下属性,该属性必须在服务器端而不是客户端使用
|
测试
启动 Kafka 和 Neo4j 后,您可以通过在 Neo4j 中创建 Person 节点,然后按如下方式查询主题来进行测试
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic neoTest --from-beginning
如果要使用 SSL 进行测试,请执行以下操作
-
创建一个包含以下内容的
client-ssl.properties
文件
security.protocol=SSL
ssl.truststore.location=/home/kafka/security/kafka.client.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.endpoint.identification.algorithm=
使用 SASL 进行身份验证
您可以通过提供 JAAS 配置文件来配置 JAAS。为此,请连接到您的 Kafka 服务器并修改 config/server.properties
文件。此配置通常有效,但其他没有 EXTERNAL 和 INTERNAL 设置的配置也应该有效。
例如,此配置适用于 AWS 上的 Kafka,但应该适用于其他配置。
listeners=EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:9093,CLIENT://0.0.0.0:9094
listener.security.protocol.map=EXTERNAL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CLIENT:SASL_PLAINTEXT
advertised.listeners=EXTERNAL://18.188.84.xxx:9092,INTERNAL://172.31.43.xxx:9093,CLIENT://18.188.84.xxx:9094
zookeeper.connect=18.188.84.xxx:2181
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
inter.broker.listener.name=INTERNAL
在 Neo4j 端,需要以下内容。请注意,在这种情况下,我们连接到公共 AWS IP 地址。
-
将 Kafka 服务器上的
~/kafka/conf/kafka_jaas.conf
内容复制到您的 Neo4j 服务器上的一个文件(例如 ~/conf/kafka_client_jaas.conf) -
在 neo4j.conf 中,添加以下内容
dbms.jvm.additional=-Djava.security.auth.login.config=/Users/davidfauth/neo4j-enterprise-4.0.4_kafka/conf/kafka_client_jaas.conf kafka.security.protocol=SASL_PLAINTEXT kafka.sasl.mechanism=PLAIN
有关更多信息,请查阅以下链接中的 Confluent 官方文档
使用 ACL 进行授权
要配置与 ACL 的使用,需要以下配置属性
kafka.authorizer.class.name=kafka.security.authorizer.AclAuthorizer
kafka.zookeeper.set.acl=true
-
kafka.security.authorizer.AclAuthorizer
(默认的 Kafka 授权程序实现)是在 Apache Kafka 2.4/Confluent Platform 5.4.0 中引入的。如果您正在运行早期版本,则使用 SimpleAclAuthorizer(kafka.security.auth.SimpleAclAuthorizer
)。如果您使用的是 Confluent 平台,您还可以使用 LDAP 授权程序(有关详细信息,请参阅 Confluent 官方文档: https://docs.confluent.io/platform/current/security/ldap-authorizer/quickstart.html) -
请注意,
zookeeper.set.acl
默认情况下为 false
从 Kafka 官方文档中您可以发现,如果某个资源没有关联的 ACL,则除了超级用户之外,没有人被允许访问该资源。如果您的 Kafka 集群中出现这种情况,则您还需要添加以下内容
kafka.allow.everyone.if.no.acl.found=true
在使用上述属性时要非常小心,因为顾名思义,如果找不到 acl,它将允许所有人访问 |
如果指定了超级用户,则还包括以下内容
kafka.super.users=...
此外,如果您更改默认用户名(主体)映射规则,则您还需要添加以下属性
-
如果您使用了 SSL 加密,则
kafka.ssl.principal.mapping.rules=...
-
如果您使用了 SASL 加密(如果您有 Kerberos 环境,则可能如此),则
kafka.sasl.kerberos.principal.to.local.rules=...
此外,如果您想确保代理也使用 Kerberos 彼此通信,则必须指定以下属性,但这对于 ACL 目的而言并不是必需的
kafka.security.inter.broker.protocol=SASL_SSL
最后一个属性默认为 PLAIN |
要使插件正常工作,必须为主题和集群资源类型授权以下操作
-
写入,当您想将插件用作源时
-
读取,当您想将插件用作接收器时
-
DescribeConfigs 和 Describe,因为插件使用以下 2 个 Kafka AdminClient API
-
listTopics
-
describeCluster
-
要使用流过程,必须根据您希望使用哪个过程授权相同操作(读取或写入)。过程和源/接收器操作所需的权限相同。
有关如何在 Kafka 上设置和定义 ACL 的更多详细信息,请参阅 Confluent Kafka 官方文档
本节仅适用于 Neo4j Streams 插件。 |