写入 Neo4j
连接器提供了三种数据源选项,用于将数据写入 Neo4j 数据库。
选项 | 描述 | 值 | 默认值 |
---|---|---|---|
|
如果您只需要创建或更新节点及其属性,或者作为添加关系之前的第一步,请使用此选项。 |
要创建或更新的节点标签的冒号分隔列表。 |
(空) |
|
如果您需要创建或更新关系以及其源节点和目标节点,请使用此选项。 |
要创建或更新的关系类型。 |
(空) |
|
如果您需要更多灵活性并知道如何编写 Cypher® 查询,请使用此选项。 |
带有 |
(空) |
示例
此页面中的所有示例都假设 |
您可以运行每个选项的读取示例以检查写入数据后的数据。 |
labels
选项
写入 :Person
节点。
示例
case class Person(name: String, surname: String, age: Int)
val peopleDF = List(
Person("John", "Doe", 42),
Person("Jane", "Doe", 40)
).toDF()
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Append)
.option("labels", ":Person")
.save()
示例
# Create example DataFrame
peopleDF = spark.createDataFrame(
[
{"name": "John", "surname": "Doe", "age": 42},
{"name": "Jane", "surname": "Doe", "age": 40},
]
)
(
peopleDF.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save()
)
有关更多信息和示例,请参阅写入节点。
relationship
选项
写入 :BOUGHT
关系及其源节点、目标节点和属性。
示例
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option("relationship.source.node.properties", "name,surname,customerID:id")
// Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
示例
# Create example DataFrame
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option("relationship.source.node.properties", "name,surname,customerID:id")
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
有关更多信息和示例,请参阅写入关系。
query
选项
使用 Cypher 查询写入数据。
示例
case class Person(name: String, surname: String, age: Int)
// Create an example DataFrame
val queryDF = List(
Person("John", "Doe", 42),
Person("Jane", "Doe", 40)
).toDF()
// Define the Cypher query to use in the write
val writeQuery =
"CREATE (n:Person {fullName: event.name + ' ' + event.surname})"
queryDF.write
.format("org.neo4j.spark.DataSource")
.option("query", writeQuery)
.mode(SaveMode.Overwrite)
.save()
示例
# Create example DataFrame
queryDF = spark.createDataFrame(
[
{"name": "John", "surname": "Doe", "age": 42},
{"name": "Jane", "surname": "Doe", "age": 40},
]
)
# Define the Cypher query to use in the write
write_query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"
(
queryDF.write.format("org.neo4j.spark.DataSource")
.option("query", write_query)
.mode("Overwrite")
.save()
)
有关更多信息和示例,请参阅使用 Cypher 查询写入数据。
保存模式
无论写入选项如何,连接器都支持数据源 mode()
方法的两种保存模式
-
Append
模式通过构建CREATE
Cypher 查询来创建新的节点或关系。 -
Overwrite
模式通过构建MERGE
Cypher 查询来创建或更新新的节点或关系。-
与
labels
选项一起使用时,需要node.keys
选项。 -
与
relationship
选项一起使用时,需要relationship.source.node.keys
和relationship.target.node.keys
。
-
类型映射
有关 Spark DataFrames 和 Neo4j 之间的完整类型映射,请参阅数据类型映射。