写入关系
此页面中的所有示例都假定 |
使用 relationship
选项,连接器通过指定源节点、目标节点和关系将 Spark DataFrame 写入 Neo4j 数据库。
为避免死锁,在将关系写入 Neo4j 之前,请务必使用单个分区(通过 |
连接器构建一个 CREATE
或 MERGE
Cypher® 查询(取决于保存模式),该查询使用 UNWIND
子句写入一批行(一个 events
列表,其大小由 batch.size
选项定义)。
查询的其余部分根据数据源选项的数量构建。
选项 | 描述 | 值 | 默认 |
---|---|---|---|
|
定义要使用的保存策略。 |
|
|
和
|
定义节点保存模式,并且可以为源节点和目标节点独立设置。 |
|
|
和
|
必需。 定义要分配给源节点和目标节点的标签。 |
冒号分隔的标签列表。 |
(空) |
和
|
当节点保存模式为 |
逗号分隔的 如果 |
(空) |
和
|
当保存策略为 |
逗号分隔的 如果 |
(空) |
|
当保存策略为 |
逗号分隔的 如果 |
(空) |
保存策略
保存策略定义了连接器将 DataFrame 模式映射到 Neo4j 节点和关系的方式。
native
策略
native
策略要求 DataFrame 符合关系读取模式,并将 relationship.nodes.map
选项设置为 false
。DataFrame 必须包含至少一个 rel.[property name]
、source.[property name]
或 target.[property name]
列。
此模式的一个良好用例是将数据从一个数据库传输到另一个数据库。当您使用连接器读取关系时,生成的 DataFrame 具有正确的模式。
如果您使用连接器从一个数据库读取数据并写入另一个数据库,则需要在每个 DataFrame 上设置连接选项,而不是在 Spark 会话上设置。 |
以下示例展示了如何将 native
策略与 Append
保存模式结合使用,用于关系以及源/目标节点。如果多次运行,它会创建重复的关系和节点。
// Columns representing node/relationships properties
// must use the "rel.", "source.", or "target." prefix
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF(
"source.name",
"source.surname",
"source.id",
"target.name",
"rel.quantity",
"rel.order"
)
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
// Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
.save()
# Columns representing node/relationships properties
# must use the "rel.", "source.", or "target." prefix
relDF = spark.createDataFrame(
[
{
"source.name": "John",
"source.surname": "Doe",
"source.customerID": 1,
"target.name": "Product 1",
"rel.quantity": 200,
"rel.order": "ABC100",
},
{
"source.name": "Jane",
"source.surname": "Doe",
"source.customerID": 2,
"target.name": "Product 2",
"rel.quantity": 100,
"rel.order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
keys
策略
keys
策略在如何写入关系和源/目标节点方面提供了更多控制。它不要求 DataFrame 具有任何特定模式,您可以指定哪些列作为节点和关系属性写入。
以下示例展示了如何将 keys
策略与 Append
保存模式结合使用,用于关系以及源/目标节点。如果多次运行,它会创建重复的关系和节点。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
节点保存模式
上一节中的示例对关系和节点都使用 Append
模式;这意味着每次运行代码时都会创建新的关系和新的节点。
与 Append
不同的节点保存模式具有不同的行为。
Match
模式
Match
模式要求具有所选标签和键的节点已经存在。此模式需要 relationship.source.node.keys
和 relationship.target.node.keys
选项。
如果不存在匹配节点,以下示例不会创建任何关系。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Match source nodes with the specified label
.option("relationship.source.save.mode", "Match")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Match target nodes with the specified label
.option("relationship.target.save.mode", "Match")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Match source nodes with the specified label
.option("relationship.source.save.mode", "Match")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Match target nodes with the specified label
.option("relationship.target.save.mode", "Match")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
MATCH (source:Customer {id: event.source.keys.id})
MATCH (target:Product {name: event.target.keys.name})
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
Overwrite
模式
如果不存在,Overwrite
模式会创建具有所选标签和键的节点。此模式需要 relationship.source.node.keys
和 relationship.target.node.keys
选项。
如果多次运行,以下示例会创建重复的关系,但不会创建重复的节点。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
由于 Spark 任务的并发性,当使用 |
覆盖节点和关系
如果您需要同时进行节点和关系的 upsert(更新或插入),则必须对所有 relationship.source.node.keys
、relationship.target.node.keys
和 mode
选项使用 Overwrite
模式。
如果多次运行,以下示例不会创建任何重复的节点或关系。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Overwrite relationships
.mode(SaveMode.Overwrite)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Overwrite relationships
.mode("Overwrite")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
MERGE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
由于 Spark 任务的并发性,当使用 |