写入关系
此页面中的所有示例都假设 |
使用 relationship
选项,连接器通过指定源节点、目标节点和关系将 Spark DataFrame 写入 Neo4j 数据库。
为了避免死锁,在将关系写入 Neo4j 之前,始终使用单个分区(使用 |
连接器构建一个 CREATE
或 MERGE
Cypher® 查询(取决于保存模式),该查询使用 UNWIND
子句写入一批行(一个大小由 batch.size
选项定义的 events
列表)。
其余查询根据多个数据源选项构建。
选项 | 描述 | 值 | 默认值 |
---|---|---|---|
|
定义要使用的保存策略。 |
|
|
和
|
定义节点保存模式,并且可以分别为源节点和目标节点设置。 |
|
|
和
|
必需。 定义要分配给源节点和目标节点的标签。 |
以冒号分隔的标签列表。 |
(空) |
和
|
当节点保存模式为 |
以逗号分隔的 如果 |
(空) |
和
|
当保存策略为 |
以逗号分隔的 如果 |
(空) |
|
当保存策略为 |
以逗号分隔的 如果 |
(空) |
保存策略
保存策略定义连接器将 DataFrame 模式映射到 Neo4j 节点和关系的方式。
native
策略
native
策略要求 DataFrame 符合关系读取模式,并且 relationship.nodes.map
选项设置为 false
。DataFrame 必须包含至少一个 rel.[属性名称]
、source.[属性名称]
或 target.[属性名称]
列。
此模式的一个很好的用例是从一个数据库传输数据到另一个数据库。当您使用连接器读取关系时,生成的 DataFrame 将具有正确的模式。
如果您使用连接器从一个数据库读取数据并向另一个数据库写入数据,则需要在每个 DataFrame 上设置连接选项而不是在 Spark 会话上。 |
以下示例显示了如何将 native
策略与关系和源/目标节点的 Append
保存模式一起使用。如果运行多次,它将创建重复的关系和节点。
// Columns representing node/relationships properties
// must use the "rel.", "source.", or "target." prefix
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF(
"source.name",
"source.surname",
"source.id",
"target.name",
"rel.quantity",
"rel.order"
)
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
// Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
.save()
# Columns representing node/relationships properties
# must use the "rel.", "source.", or "target." prefix
relDF = spark.createDataFrame(
[
{
"source.name": "John",
"source.surname": "Doe",
"source.customerID": 1,
"target.name": "Product 1",
"rel.quantity": 200,
"rel.order": "ABC100",
},
{
"source.name": "Jane",
"source.surname": "Doe",
"source.customerID": 2,
"target.name": "Product 2",
"rel.quantity": 100,
"rel.order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
keys
策略
keys
策略提供了更多关于如何写入关系和源/目标节点的控制。它不需要 DataFrame 的任何特定模式,您可以指定哪些列要作为节点和关系属性写入。
以下示例显示了如何将 keys
策略与关系和源/目标节点的 Append
保存模式一起使用。如果运行多次,它将创建重复的关系和节点。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
节点保存模式
上一节中的示例对关系和节点都使用了 Append
模式;这意味着每次运行代码时都会创建新的关系和新的节点。
与 Append
不同的节点保存模式具有不同的行为。
Match
模式
Match
模式要求具有所选标签和键的节点已经存在。此模式需要 relationship.source.node.keys
和 relationship.target.node.keys
选项。
如果不存在匹配的节点,则以下示例不会创建任何关系。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Match source nodes with the specified label
.option("relationship.source.save.mode", "Match")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Match target nodes with the specified label
.option("relationship.target.save.mode", "Match")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Match source nodes with the specified label
.option("relationship.source.save.mode", "Match")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Match target nodes with the specified label
.option("relationship.target.save.mode", "Match")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
MATCH (source:Customer {id: event.source.keys.id})
MATCH (target:Product {name: event.target.keys.name})
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
Overwrite
模式
Overwrite
模式如果节点不存在,则创建具有所选标签和键的节点。此模式需要 relationship.source.node.keys
和 relationship.target.node.keys
选项。
如果运行多次,以下示例将创建重复的关系,但不会创建重复的节点。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
由于 Spark 作业的并发性,在使用 |
覆盖节点和关系
如果您需要同时更新节点和关系,则必须对所有 relationship.source.node.keys
、relationship.target.node.keys
和 mode
选项使用 Overwrite
模式。
如果运行多次,以下示例不会创建任何重复的节点或关系。
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Overwrite relationships
.mode(SaveMode.Overwrite)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Overwrite relationships
.mode("Overwrite")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
等效的 Cypher 查询
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
MERGE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
由于 Spark 作业的并发性,在使用 |