选项和配置

使用连接器时,任何有效的 Neo4j 驱动程序选项都可以通过 Spark 中的 option 方法进行设置,例如:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
        .option("url", "neo4j://localhost:7687")
        .option("authentication.type", "basic")
        .option("authentication.basic.username", "myuser")
        .option("authentication.basic.password", "neo4jpassword")
        .option("labels", "Person")
        .load()

另外,您可以在 Spark 会话中指定全局配置,以避免每次重新输入连接选项。您可以设置任何 Neo4j 连接器选项,只需在其前面加上 neo4j. 即可。

例如,如果您想在会话中设置选项 authentication.type,您必须输入 neo4j.authentication.type。下面是一个完整示例:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder()
    .config("neo4j.url", "neo4j://localhost:7687")
    .config("neo4j.authentication.type", "basic")
    .config("neo4j.authentication.basic.username", "myuser")
    .config("neo4j.authentication.basic.password", "neo4jpassword")
    .getOrCreate()

val dfPerson = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Person")
        .load()

val dfProduct = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Product")
        .load()

Neo4j 驱动程序选项

在底层,Spark 连接器使用 官方 Neo4j Java 驱动程序。在许多情况下,您需要控制驱动程序选项的设置,以适应您的 Neo4j 生产部署以及如何与其通信。您可以使用上面的 options 示例来实现这一点。

下表列出了与 Neo4j 驱动程序一起使用的最常见配置设置。有关 Neo4j 驱动程序所有可能配置选项的完整文档,请参阅 Neo4j Java 驱动程序手册

表 1. 可用配置设置列表
设置名称 描述 默认值 必填

驱动程序选项

url

要连接的 Neo4j 实例的 URL。

当提供逗号分隔的 URI 列表时,驱动程序的 解析器功能 将被激活。第一个 URI 将用作原始主机,而其余 URI 将被视为解析器功能的输出。

(无)

authentication.type

要使用的身份验证方法

  • none

  • basic

  • kerberos

  • custom

  • bearer v.5.1

有关更多信息,请参阅 身份验证

basic

authentication.basic.username

用于基本身份验证类型的用户名

(Neo4j 驱动程序默认值)

authentication.basic.password

用于基本身份验证类型的用户名

(Neo4j 驱动程序默认值)

authentication.kerberos.ticket

Kerberos 身份验证票据

(Neo4j 驱动程序默认值)

authentication.custom.principal

这用于识别此令牌所代表的主体

(Neo4j 驱动程序默认值)

authentication.custom.credentials

这些是用于验证主体的凭据

(Neo4j 驱动程序默认值)

authentication.custom.realm

这是指定身份验证提供程序的“realm”字符串

(Neo4j 驱动程序默认值)

authentication.bearer.token

这是为持有者身份验证方案提供的令牌

(Neo4j 驱动程序默认值)

encryption.enabled

指定是否启用加密。如果使用带有 +s+ssc 后缀的 URI 方案,则此设置将被忽略

false

encryption.trust.strategy

设置证书信任策略,如果连接 URI 使用 +s+ssc 作为后缀,则此设置将被忽略。可用值有:

  • TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

  • TRUST_CUSTOM_CA_SIGNED_CERTIFICATES

  • TRUST_ALL_CERTIFICATES.

(Neo4j 驱动程序默认值)

encryption.ca.certificate.path

TRUST_CUSTOM_CA_SIGNED_CERTIFICATES 信任策略设置证书路径

(Neo4j 驱动程序默认值)

connection.max.lifetime.msecs

连接生命周期(毫秒)

(Neo4j 驱动程序默认值)

connection.liveness.timeout.msecs

活跃度检查超时(毫秒)

(Neo4j 驱动程序默认值)

connection.acquisition.timeout.msecs

连接获取超时(毫秒)

(Neo4j 驱动程序默认值)

connection.timeout.msecs

连接超时(毫秒)

(Neo4j 驱动程序默认值)

db.transaction.timeout

事务超时(毫秒)

(Neo4j 驱动程序默认值)

会话选项

database

要连接的数据库名称。驱动程序允许在 URL 中定义数据库,但如果您设置此选项,则其优先级高于在 URL 中定义的数据库。

(Neo4j 驱动程序默认值)

access.mode

可能的值有

  • read

  • write

仅在从 Neo4j 拉取数据时使用。在 read 情况下,连接器在集群环境中会将请求路由到从节点,否则路由到主节点。

read

多重连接

适用于 Apache Spark 的 Neo4j 连接器允许您在单个 Spark 会话中使用多个连接。例如,您可以在同一个会话中从一个数据库读取数据并将其写入另一个数据库。

从一个数据库读取并写入另一个数据库
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://first.host.com:7687")
  .option("labels", "Person")
  .load()

df.write.format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://second.host.com:7687")
  .option("labels", "Person")
  .save()

使用多重连接的另一个场景是当您想要合并两个数据源时。

合并来自两个数据库的数据
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val dfOne = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://first.host.com:7687")
  .option("labels", "Person")
  .load()

val dfTwo = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://second.host.com:7687")
  .option("labels", "Person")
  .load()

val dfJoin = dfOne.join(dfTwo, dfOne("name") === dfTwo("name"))
© . All rights reserved.