快速入门

开始之前

  1. 安装 Neo4j 或获取 Neo4j Aura 实例。记下 连接 URI 和访问凭据。

    在企业环境中,请考虑为 Spark 访问创建单独的用户名/密码,而不是使用默认的 neo4j 帐户。

  2. 确保已安装 Java(版本 8 或更高版本)。

  3. 根据 Spark 网站 上的文档下载 Spark。

  4. 如果您正在开发非 Python 自包含应用程序,请确保已安装 构建工具

连接 URI

本地实例

使用 neo4j:// 协议,例如 neo4j://127.0.0.1:7687

Neo4j Aura

使用 neo4j+s:// 协议。Aura 连接 URI 具有以下形式:neo4j+s://xxxxxxxx.databases.neo4j.io

Neo4j 集群

使用 neo4j+s:// 协议 适当地路由事务(将写入事务路由到领导者,将读取事务路由到跟随者和读取副本)。

在 Spark shell 中使用

您可以直接在 交互式 Spark shell(Scala 的 spark-shell,Python 的 pyspark)中复制粘贴并运行以下示例。

import org.apache.spark.sql.{SaveMode, SparkSession}

// Replace with the actual connection URI and credentials
val url = "neo4j://127.0.0.1:7687"
val username = "neo4j"
val password = "password"
val dbname = "neo4j"

val spark = SparkSession.builder
    .config("neo4j.url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()

// Create example DataFrame
val df = List(
    ("John", "Doe", 42),
    ("Jane", "Doe", 40)
).toDF("name", "surname", "age")

// Write to Neo4j
df.write
    .format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Overwrite)
    .option("labels", "Person")
    .option("node.keys", "name,surname")
    .save()

// Read from Neo4j
spark.read
    .format("org.neo4j.spark.DataSource")
    .option("labels", "Person")
    .load()
    .show()

将链式方法包装在括号中以避免语法错误。

一些常见的 API 常量在 PySpark API 中指定为字符串。例如,Python API 中的保存模式使用 df.mode("Append") 设置。

from pyspark.sql import SparkSession

# Replace with the actual connection URI and credentials
url = "neo4j://127.0.0.1:7687"
username = "neo4j"
password = "password"
dbname = "neo4j"

spark = (
    SparkSession.builder.config("neo4j.url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()
)

# Create example DataFrame
df = spark.createDataFrame(
    [
        {"name": "John", "surname": "Doe", "age": 42},
        {"name": "Jane", "surname": "Doe", "age": 40},
    ]
)

# Write to Neo4j
(
    df.write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", "Person")
    .option("node.keys", "name,surname")
    .save()
)

# Read from Neo4j
(
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", "Person")
    .load()
    .show()
)

自包含应用程序

非 Python 应用程序需要一些额外的设置。

  1. 创建一个 scala-example 目录。

  2. 安装部分 中的 build.sbt 和下面的 example.jsonl 复制到新目录中。

    example.jsonl
    {"name": "John", "surname": "Doe", "age": 42}
    {"name": "Jane", "surname": "Doe", "age": 40}
  3. 创建一个 src/main/scala 目录,并将下面的 SparkApp.scala 文件复制到其中。

    SparkApp.scala
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkApp {
        def main(args: Array[String]): Unit = {
            // Replace with the actual connection URI and credentials
            val url = "neo4j://127.0.0.1:7687"
            val username = "neo4j"
            val password = "password"
            val dbname = "neo4j"
    
            val spark = SparkSession.builder
                .config("neo4j.url", url)
                .config("neo4j.authentication.basic.username", username)
                .config("neo4j.authentication.basic.password", password)
                .config("neo4j.database", dbname)
                .appName("Spark App")
                .getOrCreate()
    
            val data = spark.read.json("example.jsonl")
    
            // Write to Neo4j
            data.write
                .format("org.neo4j.spark.DataSource")
                .mode(SaveMode.Overwrite)
                .option("labels", "Person")
                .option("node.keys", "name,surname")
                .save()
    
            // Read from Neo4j
            val ds = spark.read
                .format("org.neo4j.spark.DataSource")
                .option("labels", "Person")
                .load()
    
            ds.show()
        }
    }
  4. 运行 sbt package

  5. 运行 spark-submit

    $SPARK_HOME/bin/spark-submit \
      --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \
      --class SparkApp \
      target/scala-2.12/spark-app_2.12-1.0.jar
  1. 创建一个 python-example 目录。

  2. 将下面的 example.jsonlspark_app.py 文件复制到新目录中。

    example.jsonl
    {"name": "John", "surname": "Doe", "age": 42}
    {"name": "Jane", "surname": "Doe", "age": 40}
    spark_app.py
    from pyspark.sql import SparkSession
    
    # Replace with the actual connection URI and credentials
    url = "neo4j://127.0.0.1:7687"
    username = "neo4j"
    password = "password"
    dbname = "neo4j"
    
    spark = (
        SparkSession.builder.config("neo4j.url", url)
        .config("neo4j.authentication.basic.username", username)
        .config("neo4j.authentication.basic.password", password)
        .config("neo4j.database", dbname)
        .getOrCreate()
    )
    
    data = spark.read.json("example.jsonl")
    
    (
        data.write.format("org.neo4j.spark.DataSource")
        .mode("Overwrite")
        .option("labels", "Person")
        .option("node.keys", "name,surname")
        .save()
    )
    
    ds = (
        spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Person")
        .load()
    )
    
    ds.show()
  3. 运行 spark-submit

    $SPARK_HOME/bin/spark-submit \
      --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \
      spark_app.py
  1. 创建一个 java-example 目录。

  2. 安装部分 中的 pom.xml 和下面的 example.jsonl 复制到新目录中。

    example.jsonl
    {"name": "John", "surname": "Doe", "age": 42}
    {"name": "Jane", "surname": "Doe", "age": 40}
  3. 创建一个 src/main/java 目录,并将下面的 SparkApp.java 文件复制到其中。

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    public class SparkApp {
        public static void main(String[] args) {
            // Replace with the actual connection URI and credentials
            String url = "neo4j://127.0.0.1:7687";
            String username = "neo4j";
            String password = "password";
            String dbname = "neo4j";
    
            SparkSession spark = SparkSession
                .builder()
                .appName("Spark App")
                .config("neo4j.url", url)
                .config("neo4j.authentication.basic.username", username)
                .config("neo4j.authentication.basic.password", password)
                .config("neo4j.database", dbname)
                .getOrCreate();
    
            Dataset<Row> data = spark.read().json("example.jsonl");
    
            data.write().format("org.neo4j.spark.DataSource")
                .mode(SaveMode.Overwrite)
                .option("labels", "Person")
                .option("node.keys", "name,surname")
                .save();
    
            Dataset<Row> ds = spark.read().format("org.neo4j.spark.DataSource")
                .option("labels", "Person")
                .load();
    
            ds.show();
        }
    }
  4. 运行 mvn package

  5. 运行 spark-submit

    $SPARK_HOME/bin/spark-submit \
      --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \
      --class SparkApp \
      target/spark-app-1.0.jar

Jupyter 笔记本

代码存储库包含两个 Jupyter 笔记本,它们展示了如何在数据驱动的工作流程中使用连接器。

  • neo4j_data_engineering.ipynb 演示了如何创建 Spark 作业以从 Neo4j 读取数据并向其中写入数据。

  • neo4j_data_science.ipynb 演示了如何将 Pandas(在 PySpark 中)与 Neo4j 图数据科学库结合使用,以突出显示银行场景中的欺诈行为。