写入关系

此页面中的所有示例都假定 SparkSession 已使用适当的身份验证选项进行初始化。请参阅快速入门示例以获取更多详细信息。

使用 relationship 选项,连接器通过指定源节点、目标节点和关系将 Spark DataFrame 写入 Neo4j 数据库。

为避免死锁,在将关系写入 Neo4j 之前,请务必使用单个分区(通过 coalesce(1)repartition(1))。

连接器构建一个 CREATEMERGE Cypher® 查询(取决于保存模式),该查询使用 UNWIND 子句写入一批行(一个 events 列表,其大小由 batch.size 选项定义)。

查询的其余部分根据数据源选项的数量构建。

表 1. 写入选项
选项 描述 默认

relationship.save.strategy

定义要使用的保存策略

  • native 要求 DataFrame 使用特定模式。

  • keys 更灵活。

native

relationship.source.save.mode

relationship.target.save.mode

定义节点保存模式,并且可以为源节点和目标节点独立设置。

  • Match 模式执行 MATCH 操作。

  • Append 模式执行 CREATE 操作。

  • Overwrite 模式执行 MERGE 操作。

Match

relationship.source.labels

relationship.target.labels

必需。

定义要分配给源节点和目标节点的标签。

冒号分隔的标签列表。

(空)

relationship.source.node.keys

relationship.target.node.keys

节点保存模式MatchOverwrite 时,定义用于标识节点的节点键。

逗号分隔的 key:value 对。

如果 keyvalue 具有相同的值(例如 "name:name"),则可以省略其中一个。

(空)

relationship.source.node.properties

relationship.target.node.properties

保存策略keys 时,定义哪些 DataFrame 列作为源/目标节点属性写入。

逗号分隔的 key:value 对。

如果 keyvalue 具有相同的值(例如 "name:name"),则可以省略其中一个。

(空)

relationship.properties

保存策略keys 时,定义哪些 DataFrame 列作为关系属性写入。

逗号分隔的 key:value 对。

如果 keyvalue 具有相同的值(例如 "name:name"),则可以省略其中一个。

(空)

保存策略

保存策略定义了连接器将 DataFrame 模式映射到 Neo4j 节点和关系的方式。

native 策略

native 策略要求 DataFrame 符合关系读取模式,并将 relationship.nodes.map 选项设置为 false。DataFrame 必须包含至少一个 rel.[property name]source.[property name]target.[property name] 列。

此模式的一个良好用例是将数据从一个数据库传输到另一个数据库。当您使用连接器读取关系时,生成的 DataFrame 具有正确的模式。

如果您使用连接器从一个数据库读取数据并写入另一个数据库,则需要在每个 DataFrame 上设置连接选项,而不是在 Spark 会话上设置。

以下示例展示了如何将 native 策略与 Append 保存模式结合使用,用于关系以及源/目标节点。如果多次运行,它会创建重复的关系和节点。

示例
// Columns representing node/relationships properties
// must use the "rel.", "source.", or "target." prefix
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF(
    "source.name",
    "source.surname",
    "source.id",
    "target.name",
    "rel.quantity",
    "rel.order"
)

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
示例
# Columns representing node/relationships properties
# must use the "rel.", "source.", or "target." prefix
relDF = spark.createDataFrame(
    [
        {
            "source.name": "John",
            "source.surname": "Doe",
            "source.customerID": 1,
            "target.name": "Product 1",
            "rel.quantity": 200,
            "rel.order": "ABC100",
        },
        {
            "source.name": "Jane",
            "source.surname": "Doe",
            "source.customerID": 2,
            "target.name": "Product 2",
            "rel.quantity": 100,
            "rel.order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

keys 策略

keys 策略在如何写入关系和源/目标节点方面提供了更多控制。它不要求 DataFrame 具有任何特定模式,您可以指定哪些列作为节点和关系属性写入。

以下示例展示了如何将 keys 策略与 Append 保存模式结合使用,用于关系以及源/目标节点。如果多次运行,它会创建重复的关系和节点。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

节点保存模式

上一节中的示例对关系和节点都使用 Append 模式;这意味着每次运行代码时都会创建新的关系和新的节点。

Append 不同的节点保存模式具有不同的行为。

Match 模式

Match 模式要求具有所选标签和键的节点已经存在。此模式需要 relationship.source.node.keysrelationship.target.node.keys 选项。

如果不存在匹配节点,以下示例不会创建任何关系。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MATCH (source:Customer {id: event.source.keys.id})
MATCH (target:Product {name: event.target.keys.name})
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

Overwrite 模式

如果不存在,Overwrite 模式会创建具有所选标签和键的节点。此模式需要 relationship.source.node.keysrelationship.target.node.keys 选项。

如果多次运行,以下示例会创建重复的关系,但不会创建重复的节点。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

由于 Spark 任务的并发性,当使用 Overwrite 模式时,您应该使用属性唯一性约束来保证节点的唯一性。

覆盖节点和关系

如果您需要同时进行节点和关系的 upsert(更新或插入),则必须对所有 relationship.source.node.keysrelationship.target.node.keysmode 选项使用 Overwrite 模式。

如果多次运行,以下示例不会创建任何重复的节点或关系。

示例
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Overwrite relationships
    .mode(SaveMode.Overwrite)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
示例
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Overwrite relationships
    .mode("Overwrite")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
MERGE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

由于 Spark 任务的并发性,当使用 Overwrite 模式时,您应该使用属性唯一性约束来保证节点的唯一性。

© . All rights reserved.