选项和配置

使用连接器时,任何有效的 Neo4j 驱动程序选项都可以使用 Spark 中的 option 方法设置,如下所示

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
        .option("url", "neo4j://127.0.0.1:7687")
        .option("authentication.type", "basic")
        .option("authentication.basic.username", "myuser")
        .option("authentication.basic.password", "neo4jpassword")
        .option("labels", "Person")
        .load()

或者,您可以在 Spark Session 中指定全局配置以避免每次都重新输入连接选项。您可以设置任何 Neo4j 连接器选项,只需在前面加上 neo4j. 即可。

例如,如果您想在会话中设置选项 authentication.type,则必须键入 neo4j.authentication.type。这是一个完整的示例

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder()
    .config("neo4j.url", "neo4j://127.0.0.1:7687")
    .config("neo4j.authentication.type", "basic")
    .config("neo4j.authentication.basic.username", "myuser")
    .config("neo4j.authentication.basic.password", "neo4jpassword")
    .getOrCreate()

val dfPerson = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Person")
        .load()

val dfProduct = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Product")
        .load()

Neo4j 驱动程序选项

在幕后,Spark 连接器使用官方 Neo4j Java 驱动程序。在许多情况下,您希望能够控制设置驱动程序选项以适应 Neo4j 的生产部署以及如何与其通信。您可以使用上面 options 的示例来实现。

下表包含了 Neo4j 驱动程序最常用的配置设置。有关 Neo4j 驱动程序所有可能的配置选项的完整文档,请参阅Neo4j Java 驱动程序手册

表 1. 可用配置设置列表
设置名称 描述 默认值 是否必需

驱动程序选项

url

要连接到的 Neo4j 实例的 URL。

当提供用逗号分隔的 URI 列表时,将激活驱动程序的解析器函数功能。第一个 URI 将用作原始主机,其余 URI 将被视为解析器函数的输出。

(无)

authentication.type

要使用的身份验证方法

  • basic

  • kerberos

  • custom

  • bearer v.5.1

有关更多信息,请参阅身份验证

basic

authentication.basic.username

用于基本身份验证类型的用户名

(Neo4j 驱动程序默认值)

authentication.basic.password

用于基本身份验证类型的用户名

(Neo4j 驱动程序默认值)

authentication.kerberos.ticket

Kerberos 身份验证票证

(Neo4j 驱动程序默认值)

authentication.custom.principal

用于标识此令牌代表谁

(Neo4j 驱动程序默认值)

authentication.custom.credentials

用于验证主体凭据

(Neo4j 驱动程序默认值)

authentication.custom.realm

指定身份验证提供程序的“领域”字符串

(Neo4j 驱动程序默认值)

authentication.bearer.token

用于提供给承载身份验证方案的令牌

(Neo4j 驱动程序默认值)

encryption.enabled

指定是否应启用加密。如果您使用具有 +s+ssc 的 URI 方案,则忽略此设置

false

encryption.trust.strategy

设置证书信任策略,如果连接 URI 使用 +s+ssc 作为后缀,则忽略它。可用值包括:

  • TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

  • TRUST_CUSTOM_CA_SIGNED_CERTIFICATES

  • TRUST_ALL_CERTIFICATES.

(Neo4j 驱动程序默认值)

encryption.ca.certificate.path

TRUST_CUSTOM_CA_SIGNED_CERTIFICATES 信任策略设置证书路径

(Neo4j 驱动程序默认值)

connection.max.lifetime.msecs

连接生命周期(毫秒)

(Neo4j 驱动程序默认值)

connection.liveness.timeout.msecs

活动检查超时(毫秒)

(Neo4j 驱动程序默认值)

connection.acquisition.timeout.msecs

连接获取超时(毫秒)

(Neo4j 驱动程序默认值)

connection.timeout.msecs

连接超时(毫秒)

(Neo4j 驱动程序默认值)

会话选项

database

要连接到的数据库名称。驱动程序允许在 URL 中定义数据库,但是如果您设置此选项,则与 URL 中定义的选项相比,它具有优先级。

(Neo4j 驱动程序默认值)

access.mode

可能的值包括:

  • read

  • write

仅在您从 Neo4j 中提取数据时使用。如果是 read,则连接器在集群环境中将请求路由到从节点,否则路由到主节点。

read

多个连接

Neo4j Apache Spark 连接器允许您在一个 Spark Session 中使用多个连接。例如,您可以在同一个会话中从一个数据库读取数据并将其写入另一个数据库。

从一个数据库读取数据并写入另一个数据库
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://first.host.com:7687")
  .option("labels", "Person")
  .load()

df.write.format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://second.host.com:7687")
  .option("labels", "Person")
  .save()

使用多个连接的另一种情况是当您想要合并两个数据源时。

合并来自两个数据库的数据
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val dfOne = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://first.host.com:7687")
  .option("labels", "Person")
  .load()

val dfTwo = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://second.host.com:7687")
  .option("labels", "Person")
  .load()

val dfJoin = dfOne.join(dfTwo, dfOne("name") === dfTwo("name"))