选项和配置
使用连接器时,任何有效的 Neo4j 驱动程序选项都可以通过 Spark 中的 option
方法进行设置,例如:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://localhost:7687")
.option("authentication.type", "basic")
.option("authentication.basic.username", "myuser")
.option("authentication.basic.password", "neo4jpassword")
.option("labels", "Person")
.load()
另外,您可以在 Spark 会话中指定全局配置,以避免每次重新输入连接选项。您可以设置任何 Neo4j 连接器选项,只需在其前面加上 neo4j.
即可。
例如,如果您想在会话中设置选项 authentication.type
,您必须输入 neo4j.authentication.type
。下面是一个完整示例:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder()
.config("neo4j.url", "neo4j://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "myuser")
.config("neo4j.authentication.basic.password", "neo4jpassword")
.getOrCreate()
val dfPerson = spark.read.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
val dfProduct = spark.read.format("org.neo4j.spark.DataSource")
.option("labels", "Product")
.load()
Neo4j 驱动程序选项
在底层,Spark 连接器使用 官方 Neo4j Java 驱动程序。在许多情况下,您需要控制驱动程序选项的设置,以适应您的 Neo4j 生产部署以及如何与其通信。您可以使用上面的 options
示例来实现这一点。
下表列出了与 Neo4j 驱动程序一起使用的最常见配置设置。有关 Neo4j 驱动程序所有可能配置选项的完整文档,请参阅 Neo4j Java 驱动程序手册。
设置名称 | 描述 | 默认值 | 必填 |
---|---|---|---|
驱动程序选项 |
|||
|
要连接的 Neo4j 实例的 URL。 当提供逗号分隔的 URI 列表时,驱动程序的 解析器功能 将被激活。第一个 URI 将用作原始主机,而其余 URI 将被视为解析器功能的输出。 |
(无) |
是 |
|
|
否 |
|
|
用于基本身份验证类型的用户名 |
(Neo4j 驱动程序默认值) |
否 |
|
用于基本身份验证类型的用户名 |
(Neo4j 驱动程序默认值) |
否 |
|
Kerberos 身份验证票据 |
(Neo4j 驱动程序默认值) |
否 |
|
这用于识别此令牌所代表的主体 |
(Neo4j 驱动程序默认值) |
否 |
|
这些是用于验证主体的凭据 |
(Neo4j 驱动程序默认值) |
否 |
|
这是指定身份验证提供程序的“realm”字符串 |
(Neo4j 驱动程序默认值) |
否 |
|
这是为持有者身份验证方案提供的令牌 |
(Neo4j 驱动程序默认值) |
否 |
|
指定是否启用加密。如果使用带有 |
|
否 |
|
设置证书信任策略,如果连接 URI 使用
|
(Neo4j 驱动程序默认值) |
否 |
|
为 |
(Neo4j 驱动程序默认值) |
否 |
|
连接生命周期(毫秒) |
(Neo4j 驱动程序默认值) |
否 |
|
活跃度检查超时(毫秒) |
(Neo4j 驱动程序默认值) |
否 |
|
连接获取超时(毫秒) |
(Neo4j 驱动程序默认值) |
否 |
|
连接超时(毫秒) |
(Neo4j 驱动程序默认值) |
否 |
|
事务超时(毫秒) |
(Neo4j 驱动程序默认值) |
否 |
会话选项 |
|||
|
要连接的数据库名称。驱动程序允许在 URL 中定义数据库,但如果您设置此选项,则其优先级高于在 URL 中定义的数据库。 |
(Neo4j 驱动程序默认值) |
否 |
|
可能的值有
仅在从 Neo4j 拉取数据时使用。在 |
|
否 |
多重连接
适用于 Apache Spark 的 Neo4j 连接器允许您在单个 Spark 会话中使用多个连接。例如,您可以在同一个会话中从一个数据库读取数据并将其写入另一个数据库。
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://first.host.com:7687")
.option("labels", "Person")
.load()
df.write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "neo4j://second.host.com:7687")
.option("labels", "Person")
.save()
使用多重连接的另一个场景是当您想要合并两个数据源时。
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val dfOne = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://first.host.com:7687")
.option("labels", "Person")
.load()
val dfTwo = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://second.host.com:7687")
.option("labels", "Person")
.load()
val dfJoin = dfOne.join(dfTwo, dfOne("name") === dfTwo("name"))