使用 Cypher 查询写入

此页面中的所有示例都假设 SparkSession 已使用适当的身份验证选项初始化。有关更多详细信息,请参阅快速入门示例

如果您需要更多灵活性,可以使用 query 选项使用 CREATEMERGE 子句运行自定义 Cypher® 查询。

连接器使用提供的查询持久化整个 DataFrame。节点以 batch.size 属性定义的行批次发送到 Neo4j,并且查询包装在 UNWIND $events AS event 语句中。

示例
case class Person(name: String, surname: String, age: Int)

// Create an example DataFrame
val df = Seq(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

df.write
  .format("org.neo4j.spark.DataSource")
  .option("query", query)
  .mode(SaveMode.Overwrite)
  .save()
等效的 Cypher 查询
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})

events 是从数据集创建的一个批次。

注意事项

  • 您必须始终指定保存模式

  • 您也可以在 WITH 语句中使用 events 列表。例如,您可以将上一个示例中的查询替换为以下内容

    WITH event.name + ' ' + toUpper(event.surname) AS fullName
    CREATE (n:Person {fullName: fullName})
  • 支持引用 events 列表中 CALL 的子查询

    CALL {
      WITH event
      RETURN event.name + ' ' + toUpper(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • 如果安装了 APOC,则可以使用 APOC 过程和函数

    CALL {
      WITH event
      RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • 虽然不禁止 RETURN 子句,但添加它对查询结果没有任何影响。

script 选项

script 选项允许在执行读取操作之前运行一系列 Cypher 查询。

脚本的结果可以在后续查询中使用,例如注入查询参数。

示例
val df = Seq(Person("John", "Doe", 42)).toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Append)
  .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
  .option("script",
    """CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
      |CREATE CONSTRAINT product_name_sku FOR (p:Product)
      | REQUIRE (p.name, p.sku)
      | IS NODE KEY;
      |RETURN 36 AS age;
      |""".stripMargin)
  .save()
等效的 Cypher 查询
WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})

scriptResultscript 中最后一个 Cypher 查询的结果,在本例中为 RETURN 36 AS age

events 是从数据集创建的一个批次。