写入关系

此页面中的所有示例都假设 SparkSession 已使用适当的身份验证选项初始化。有关更多详细信息,请参阅快速入门示例

使用 relationship 选项,连接器通过指定源节点、目标节点和关系将 Spark DataFrame 写入 Neo4j 数据库。

为了避免死锁,在将关系写入 Neo4j 之前,始终使用单个分区(使用 coalesce(1)repartition(1))。

连接器构建一个 CREATEMERGE Cypher® 查询(取决于保存模式),该查询使用 UNWIND 子句写入一批行(一个大小由 batch.size 选项定义的 events 列表)。

其余查询根据多个数据源选项构建。

表 1. 写入选项
选项 描述 默认值

relationship.save.strategy

定义要使用的保存策略

  • native 要求 DataFrame 使用特定模式。

  • keys 更加灵活。

native

relationship.source.save.mode

relationship.target.save.mode

定义节点保存模式,并且可以分别为源节点和目标节点设置。

  • Match 模式执行 MATCH

  • Append 模式执行 CREATE

  • Overwrite 模式执行 MERGE

Match

relationship.source.labels

relationship.target.labels

必需。

定义要分配给源节点和目标节点的标签。

以冒号分隔的标签列表。

(空)

relationship.source.node.keys

relationship.target.node.keys

节点保存模式MatchOverwrite 时,定义标识节点的节点键。

以逗号分隔的 key:value 对列表。

如果 keyvalue 值相同(例如 "name:name"),则可以省略其中一个。

(空)

relationship.source.node.properties

relationship.target.node.properties

保存策略keys 时,定义要作为源/目标节点属性写入的 DataFrame 列。

以逗号分隔的 key:value 对列表。

如果 keyvalue 值相同(例如 "name:name"),则可以省略其中一个。

(空)

relationship.properties

保存策略keys 时,定义要作为关系属性写入的 DataFrame 列。

以逗号分隔的 key:value 对列表。

如果 keyvalue 值相同(例如 "name:name"),则可以省略其中一个。

(空)

保存策略

保存策略定义连接器将 DataFrame 模式映射到 Neo4j 节点和关系的方式。

native 策略

native 策略要求 DataFrame 符合关系读取模式,并且 relationship.nodes.map 选项设置为 false。DataFrame 必须包含至少一个 rel.[属性名称]source.[属性名称]target.[属性名称] 列。

此模式的一个很好的用例是从一个数据库传输数据到另一个数据库。当您使用连接器读取关系时,生成的 DataFrame 将具有正确的模式。

如果您使用连接器从一个数据库读取数据并向另一个数据库写入数据,则需要在每个 DataFrame 上设置连接选项而不是在 Spark 会话上

以下示例显示了如何将 native 策略与关系和源/目标节点的 Append 保存模式一起使用。如果运行多次,它将创建重复的关系和节点。

示例
// Columns representing node/relationships properties
// must use the "rel.", "source.", or "target." prefix
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF(
    "source.name",
    "source.surname",
    "source.id",
    "target.name",
    "rel.quantity",
    "rel.order"
)

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
示例
# Columns representing node/relationships properties
# must use the "rel.", "source.", or "target." prefix
relDF = spark.createDataFrame(
    [
        {
            "source.name": "John",
            "source.surname": "Doe",
            "source.customerID": 1,
            "target.name": "Product 1",
            "rel.quantity": 200,
            "rel.order": "ABC100",
        },
        {
            "source.name": "Jane",
            "source.surname": "Doe",
            "source.customerID": 2,
            "target.name": "Product 2",
            "rel.quantity": 100,
            "rel.order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

keys 策略

keys 策略提供了更多关于如何写入关系和源/目标节点的控制。它不需要 DataFrame 的任何特定模式,您可以指定哪些列要作为节点和关系属性写入。

以下示例显示了如何将 keys 策略与关系和源/目标节点的 Append 保存模式一起使用。如果运行多次,它将创建重复的关系和节点。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

节点保存模式

上一节中的示例对关系和节点都使用了 Append 模式;这意味着每次运行代码时都会创建新的关系和新的节点。

Append 不同的节点保存模式具有不同的行为。

Match 模式

Match 模式要求具有所选标签和键的节点已经存在。此模式需要 relationship.source.node.keysrelationship.target.node.keys 选项。

如果不存在匹配的节点,则以下示例不会创建任何关系。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MATCH (source:Customer {id: event.source.keys.id})
MATCH (target:Product {name: event.target.keys.name})
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

Overwrite 模式

Overwrite 模式如果节点不存在,则创建具有所选标签和键的节点。此模式需要 relationship.source.node.keysrelationship.target.node.keys 选项。

如果运行多次,以下示例将创建重复的关系,但不会创建重复的节点。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

由于 Spark 作业的并发性,在使用 Overwrite 模式时,应使用属性唯一性约束来保证节点的唯一性。

覆盖节点和关系

如果您需要同时更新节点和关系,则必须对所有 relationship.source.node.keysrelationship.target.node.keysmode 选项使用 Overwrite 模式。

如果运行多次,以下示例不会创建任何重复的节点或关系。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Overwrite relationships
    .mode(SaveMode.Overwrite)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Overwrite relationships
    .mode("Overwrite")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
MERGE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

由于 Spark 作业的并发性,在使用 Overwrite 模式时,应使用属性唯一性约束来保证节点的唯一性。