写入节点

本页中的所有示例都假定 SparkSession 已使用适当的身份验证选项进行了初始化。有关更多详细信息,请参阅快速入门示例

使用 labels 选项,连接器将 DataFrame 写入 Neo4j 数据库,作为一组具有给定标签的节点。

连接器会构建一个 CREATEMERGE Cypher® 查询(取决于保存模式),该查询使用 UNWIND 子句来写入一批行(一个由 batch.size 选项定义大小的 events 列表)。

来自示例的代码创建了带有 :Person 标签的新节点。

示例
case class Person(name: String, surname: String, age: Int)

val peopleDF = List(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

peopleDF.write
    .format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Append)
    .option("labels", ":Person")
    .save()
示例
# Create example DataFrame
peopleDF = spark.createDataFrame(
    [
        {"name": "John", "surname": "Doe", "age": 42},
        {"name": "Jane", "surname": "Doe", "age": 40},
    ]
)

(
    peopleDF.write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Person")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (n:Person)
SET n += event.properties

您可以使用冒号作为分隔符来写入具有多个标签的节点。第一个标签前的冒号是可选的。

示例
peopleDF.write
    .format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Append)
    // ":Person:Employee" and "Person:Employee"
    // are equivalent
    .option("labels", ":Person:Employee")
    .save()
示例
(
    peopleDF.write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Person")
    # ":Person:Employee" and "Person:Employee"
    # are equivalent
    .option("labels", ":Person:Employee")
    .save()
)

节点键

Overwrite 模式下,您必须指定要用作键来匹配节点的 DataFrame 列。node.keys 选项接受一个逗号分隔的 key:value 对列表,其中 key 是 DataFrame 列名,value 是节点属性名。

如果 keyvalue 相同,您可以省略 value。例如,"name:name,surname:surname" 等效于 "name,surname"

使用 Overwrite 保存模式的相同代码

覆盖示例
df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .option("labels", ":Person")
  .option("node.keys", "name,surname")
  .save()
等效的 Cypher 查询
UNWIND $events AS event
MERGE (n:Person {
  name: event.keys.name,
  surname: event.keys.surname
})
SET n += event.properties

由于 Spark 作业的并发性,在使用 Overwrite 模式时,您应该使用属性唯一性约束来保证节点的唯一性。

© . All rights reserved.