使用 Cypher 查询写入
此页面中的所有示例都假设 |
如果您需要更多灵活性,可以使用 query
选项使用 CREATE
和 MERGE
子句运行自定义 Cypher® 查询。
连接器使用提供的查询持久化整个 DataFrame。节点以 batch.size
属性定义的行批次发送到 Neo4j,并且查询包装在 UNWIND $events AS event
语句中。
示例
case class Person(name: String, surname: String, age: Int)
// Create an example DataFrame
val df = Seq(
Person("John", "Doe", 42),
Person("Jane", "Doe", 40)
).toDF()
// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"
df.write
.format("org.neo4j.spark.DataSource")
.option("query", query)
.mode(SaveMode.Overwrite)
.save()
等效的 Cypher 查询
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})
events
是从数据集创建的一个批次。
注意事项
-
您必须始终指定保存模式。
-
您也可以在
WITH
语句中使用events
列表。例如,您可以将上一个示例中的查询替换为以下内容WITH event.name + ' ' + toUpper(event.surname) AS fullName CREATE (n:Person {fullName: fullName})
-
支持引用
events
列表中CALL
的子查询CALL { WITH event RETURN event.name + ' ' + toUpper(event.surname) AS fullName } CREATE (n:Person {fullName: fullName})
-
如果安装了 APOC,则可以使用 APOC 过程和函数
CALL { WITH event RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName } CREATE (n:Person {fullName: fullName})
-
虽然不禁止
RETURN
子句,但添加它对查询结果没有任何影响。
script
选项
script
选项允许在执行读取操作之前运行一系列 Cypher 查询。
脚本的结果可以在后续查询中使用,例如注入查询参数。
示例
val df = Seq(Person("John", "Doe", 42)).toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Append)
.option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
.option("script",
"""CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
|CREATE CONSTRAINT product_name_sku FOR (p:Product)
| REQUIRE (p.name, p.sku)
| IS NODE KEY;
|RETURN 36 AS age;
|""".stripMargin)
.save()
等效的 Cypher 查询
WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})
scriptResult
是 script
中最后一个 Cypher 查询的结果,在本例中为 RETURN 36 AS age
。
events
是从数据集创建的一个批次。