写入 Neo4j

连接器提供了三种数据源选项,用于将数据写入 Neo4j 数据库。

表 1. 写入选项
选项 描述 默认值

标签

如果您只需要创建或更新节点及其属性,或者作为添加关系之前的第一步,请使用此选项。

要创建或更新的节点标签的冒号分隔列表。

(空)

关系

如果您需要创建或更新关系以及其源节点和目标节点,请使用此选项。

要创建或更新的关系类型。

(空)

查询

如果您需要更多灵活性并知道如何编写 Cypher® 查询,请使用此选项。

带有 CREATEMERGE 子句的 Cypher 查询。

(空)

示例

此页面中的所有示例都假设 SparkSession 已使用适当的身份验证选项初始化。有关更多详细信息,请参阅快速入门示例

您可以运行每个选项的读取示例以检查写入数据后的数据。

labels 选项

写入 :Person 节点。

示例
case class Person(name: String, surname: String, age: Int)

val peopleDF = List(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

peopleDF.write
    .format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Append)
    .option("labels", ":Person")
    .save()
示例
# Create example DataFrame
peopleDF = spark.createDataFrame(
    [
        {"name": "John", "surname": "Doe", "age": 42},
        {"name": "Jane", "surname": "Doe", "age": 40},
    ]
)

(
    peopleDF.write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Person")
    .save()
)

有关更多信息和示例,请参阅写入节点

relationship 选项

写入 :BOUGHT 关系及其源节点、目标节点和属性。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", "name,surname,customerID:id")
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
# Create example DataFrame
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", "name,surname,customerID:id")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)

有关更多信息和示例,请参阅写入关系

query 选项

使用 Cypher 查询写入数据。

示例
case class Person(name: String, surname: String, age: Int)

// Create an example DataFrame
val queryDF = List(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

// Define the Cypher query to use in the write
val writeQuery =
    "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

queryDF.write
    .format("org.neo4j.spark.DataSource")
    .option("query", writeQuery)
    .mode(SaveMode.Overwrite)
    .save()
示例
# Create example DataFrame
queryDF = spark.createDataFrame(
    [
        {"name": "John", "surname": "Doe", "age": 42},
        {"name": "Jane", "surname": "Doe", "age": 40},
    ]
)

# Define the Cypher query to use in the write
write_query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

(
    queryDF.write.format("org.neo4j.spark.DataSource")
    .option("query", write_query)
    .mode("Overwrite")
    .save()
)

有关更多信息和示例,请参阅使用 Cypher 查询写入数据

保存模式

无论写入选项如何,连接器都支持数据源 mode() 方法的两种保存模式

  • Append 模式通过构建 CREATE Cypher 查询来创建新的节点或关系。

  • Overwrite 模式通过构建 MERGE Cypher 查询来创建或更新新的节点或关系。

    • labels 选项一起使用时,需要 node.keys 选项。

    • relationship 选项一起使用时,需要 relationship.source.node.keysrelationship.target.node.keys

类型映射

有关 Spark DataFrames 和 Neo4j 之间的完整类型映射,请参阅数据类型映射

性能注意事项

由于写入通常是一个昂贵的操作,因此请确保只写入所需的数据帧列。

例如,如果数据源中的列为 namesurnameagelivesIn,但您只需要 namesurname,则可以执行以下操作

df.select(df("name"), df("surname"))
  .write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Append)
  .option("labels", ":Person")
  .save()