快速入门
开始之前
-
安装 Neo4j 或获取 Neo4j Aura 实例。记下 连接 URI 和访问凭据。
在企业环境中,请考虑为 Spark 访问创建单独的用户名/密码,而不是使用默认的
neo4j
帐户。 -
确保已安装 Java(版本 8 或更高版本)。
-
根据 Spark 网站 上的文档下载 Spark。
-
如果您正在开发非 Python 自包含应用程序,请确保已安装 构建工具。
连接 URI
- 本地实例
-
使用
neo4j://
协议,例如neo4j://127.0.0.1:7687
。 - Neo4j Aura
-
使用
neo4j+s://
协议。Aura 连接 URI 具有以下形式:neo4j+s://xxxxxxxx.databases.neo4j.io
。 - Neo4j 集群
-
使用
neo4j+s://
协议 适当地路由事务(将写入事务路由到领导者,将读取事务路由到跟随者和读取副本)。
在 Spark shell 中使用
您可以直接在 交互式 Spark shell(Scala 的 spark-shell
,Python 的 pyspark
)中复制粘贴并运行以下示例。
import org.apache.spark.sql.{SaveMode, SparkSession}
// Replace with the actual connection URI and credentials
val url = "neo4j://127.0.0.1:7687"
val username = "neo4j"
val password = "password"
val dbname = "neo4j"
val spark = SparkSession.builder
.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", username)
.config("neo4j.authentication.basic.password", password)
.config("neo4j.database", dbname)
.getOrCreate()
// Create example DataFrame
val df = List(
("John", "Doe", 42),
("Jane", "Doe", 40)
).toDF("name", "surname", "age")
// Write to Neo4j
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", "Person")
.option("node.keys", "name,surname")
.save()
// Read from Neo4j
spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
.show()
将链式方法包装在括号中以避免语法错误。 |
一些常见的 API 常量在 PySpark API 中指定为字符串。例如,Python API 中的保存模式使用 |
from pyspark.sql import SparkSession
# Replace with the actual connection URI and credentials
url = "neo4j://127.0.0.1:7687"
username = "neo4j"
password = "password"
dbname = "neo4j"
spark = (
SparkSession.builder.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", username)
.config("neo4j.authentication.basic.password", password)
.config("neo4j.database", dbname)
.getOrCreate()
)
# Create example DataFrame
df = spark.createDataFrame(
[
{"name": "John", "surname": "Doe", "age": 42},
{"name": "Jane", "surname": "Doe", "age": 40},
]
)
# Write to Neo4j
(
df.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", "Person")
.option("node.keys", "name,surname")
.save()
)
# Read from Neo4j
(
spark.read.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
.show()
)
自包含应用程序
非 Python 应用程序需要一些额外的设置。
-
创建一个
scala-example
目录。 -
将 安装部分 中的
build.sbt
和下面的example.jsonl
复制到新目录中。example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
-
创建一个
src/main/scala
目录,并将下面的SparkApp.scala
文件复制到其中。SparkApp.scalaimport org.apache.spark.sql.{SaveMode, SparkSession} object SparkApp { def main(args: Array[String]): Unit = { // Replace with the actual connection URI and credentials val url = "neo4j://127.0.0.1:7687" val username = "neo4j" val password = "password" val dbname = "neo4j" val spark = SparkSession.builder .config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .appName("Spark App") .getOrCreate() val data = spark.read.json("example.jsonl") // Write to Neo4j data.write .format("org.neo4j.spark.DataSource") .mode(SaveMode.Overwrite) .option("labels", "Person") .option("node.keys", "name,surname") .save() // Read from Neo4j val ds = spark.read .format("org.neo4j.spark.DataSource") .option("labels", "Person") .load() ds.show() } }
-
运行
sbt package
。 -
运行
spark-submit
$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \ --class SparkApp \ target/scala-2.12/spark-app_2.12-1.0.jar
-
创建一个
python-example
目录。 -
将下面的
example.jsonl
和spark_app.py
文件复制到新目录中。example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
spark_app.pyfrom pyspark.sql import SparkSession # Replace with the actual connection URI and credentials url = "neo4j://127.0.0.1:7687" username = "neo4j" password = "password" dbname = "neo4j" spark = ( SparkSession.builder.config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .getOrCreate() ) data = spark.read.json("example.jsonl") ( data.write.format("org.neo4j.spark.DataSource") .mode("Overwrite") .option("labels", "Person") .option("node.keys", "name,surname") .save() ) ds = ( spark.read.format("org.neo4j.spark.DataSource") .option("labels", "Person") .load() ) ds.show()
-
运行
spark-submit
$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \ spark_app.py
-
创建一个
java-example
目录。 -
将 安装部分 中的
pom.xml
和下面的example.jsonl
复制到新目录中。example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
-
创建一个
src/main/java
目录,并将下面的SparkApp.java
文件复制到其中。import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class SparkApp { public static void main(String[] args) { // Replace with the actual connection URI and credentials String url = "neo4j://127.0.0.1:7687"; String username = "neo4j"; String password = "password"; String dbname = "neo4j"; SparkSession spark = SparkSession .builder() .appName("Spark App") .config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .getOrCreate(); Dataset<Row> data = spark.read().json("example.jsonl"); data.write().format("org.neo4j.spark.DataSource") .mode(SaveMode.Overwrite) .option("labels", "Person") .option("node.keys", "name,surname") .save(); Dataset<Row> ds = spark.read().format("org.neo4j.spark.DataSource") .option("labels", "Person") .load(); ds.show(); } }
-
运行
mvn package
。 -
运行
spark-submit
$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \ --class SparkApp \ target/spark-app-1.0.jar
Jupyter 笔记本
代码存储库包含两个 Jupyter 笔记本,它们展示了如何在数据驱动的工作流程中使用连接器。
-
neo4j_data_engineering.ipynb
演示了如何创建 Spark 作业以从 Neo4j 读取数据并向其中写入数据。 -
neo4j_data_science.ipynb
演示了如何将 Pandas(在 PySpark 中)与 Neo4j 图数据科学库结合使用,以突出显示银行场景中的欺诈行为。