快速入门
开始前
-
安装 Neo4j 或获取 Neo4j Aura 实例。记下连接 URI 和访问凭证。
在企业环境中,考虑为 Spark 访问创建单独的用户名/密码,而不是使用默认的
neo4j
账户。 -
确保已安装 Java(版本 8 或更高)。
-
按照 Spark 网站上的说明下载 Spark。
-
如果您正在开发非 Python 的独立应用程序,请确保已安装构建工具。
连接 URI
- 本地实例
-
使用
neo4j://
协议,例如neo4j://localhost:7687
。 - Neo4j Aura
-
使用
neo4j+s://
协议。Aura 连接 URI 的形式为neo4j+s://xxxxxxxx.databases.neo4j.io
。 - Neo4j 集群
-
使用
neo4j+s://
协议来适当路由事务(写事务到主节点,读事务到从节点和只读副本)。
在 Spark shell 中使用
您可以直接在交互式 Spark shell(Scala 使用 spark-shell
,Python 使用 pyspark
)中复制粘贴并运行以下示例。
import org.apache.spark.sql.{SaveMode, SparkSession}
// Replace with the actual connection URI and credentials
val url = "neo4j://localhost:7687"
val username = "neo4j"
val password = "password"
val dbname = "neo4j"
val spark = SparkSession.builder
.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", username)
.config("neo4j.authentication.basic.password", password)
.config("neo4j.database", dbname)
.getOrCreate()
// Create example DataFrame
val df = List(
("John", "Doe", 42),
("Jane", "Doe", 40)
).toDF("name", "surname", "age")
// Write to Neo4j
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", "Person")
.option("node.keys", "name,surname")
.save()
// Read from Neo4j
spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
.show()
将链式方法用括号括起来以避免语法错误。 |
PySpark API 中的一些常见 API 常量被指定为字符串。例如,Python API 中的保存模式通过 |
from pyspark.sql import SparkSession
# Replace with the actual connection URI and credentials
url = "neo4j://localhost:7687"
username = "neo4j"
password = "password"
dbname = "neo4j"
spark = (
SparkSession.builder.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", username)
.config("neo4j.authentication.basic.password", password)
.config("neo4j.database", dbname)
.getOrCreate()
)
# Create example DataFrame
df = spark.createDataFrame(
[
{"name": "John", "surname": "Doe", "age": 42},
{"name": "Jane", "surname": "Doe", "age": 40},
]
)
# Write to Neo4j
(
df.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", "Person")
.option("node.keys", "name,surname")
.save()
)
# Read from Neo4j
(
spark.read.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
.show()
)
独立应用程序
非 Python 应用程序需要一些额外的设置。
-
创建一个
scala-example
目录。 -
将安装部分中的
build.sbt
和下面的example.jsonl
复制到新目录中。example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
-
创建一个
src/main/scala
目录,并复制下面的SparkApp.scala
文件。SparkApp.scalaimport org.apache.spark.sql.{SaveMode, SparkSession} object SparkApp { def main(args: Array[String]): Unit = { // Replace with the actual connection URI and credentials val url = "neo4j://localhost:7687" val username = "neo4j" val password = "password" val dbname = "neo4j" val spark = SparkSession.builder .config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .appName("Spark App") .getOrCreate() val data = spark.read.json("example.jsonl") // Write to Neo4j data.write .format("org.neo4j.spark.DataSource") .mode(SaveMode.Overwrite) .option("labels", "Person") .option("node.keys", "name,surname") .save() // Read from Neo4j val ds = spark.read .format("org.neo4j.spark.DataSource") .option("labels", "Person") .load() ds.show() } }
-
运行
sbt package
。 -
运行
spark-submit
$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.8_for_spark_3 \ --class SparkApp \ target/scala-2.12/spark-app_2.12-1.0.jar
-
创建一个
python-example
目录。 -
将下面的
example.jsonl
和spark_app.py
文件复制到新目录中。example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
spark_app.pyfrom pyspark.sql import SparkSession # Replace with the actual connection URI and credentials url = "neo4j://localhost:7687" username = "neo4j" password = "password" dbname = "neo4j" spark = ( SparkSession.builder.config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .getOrCreate() ) data = spark.read.json("example.jsonl") ( data.write.format("org.neo4j.spark.DataSource") .mode("Overwrite") .option("labels", "Person") .option("node.keys", "name,surname") .save() ) ds = ( spark.read.format("org.neo4j.spark.DataSource") .option("labels", "Person") .load() ) ds.show()
-
运行
spark-submit
$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.8_for_spark_3 \ spark_app.py
-
创建一个
java-example
目录。 -
将安装部分中的
pom.xml
和下面的example.jsonl
复制到新目录中。example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
-
创建一个
src/main/java
目录,并复制下面的SparkApp.java
文件。import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class SparkApp { public static void main(String[] args) { // Replace with the actual connection URI and credentials String url = "neo4j://localhost:7687"; String username = "neo4j"; String password = "password"; String dbname = "neo4j"; SparkSession spark = SparkSession .builder() .appName("Spark App") .config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .getOrCreate(); Dataset<Row> data = spark.read().json("example.jsonl"); data.write().format("org.neo4j.spark.DataSource") .mode(SaveMode.Overwrite) .option("labels", "Person") .option("node.keys", "name,surname") .save(); Dataset<Row> ds = spark.read().format("org.neo4j.spark.DataSource") .option("labels", "Person") .load(); ds.show(); } }
-
运行
mvn package
。 -
运行
spark-submit
$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.8_for_spark_3 \ --class SparkApp \ target/spark-app-1.0.jar
Jupyter notebooks
代码仓库包含两个 Jupyter notebooks,它们展示了如何在数据驱动的工作流程中使用连接器。
-
neo4j_data_engineering.ipynb
展示了如何创建 Spark 作业来从 Neo4j 读取数据并向 Neo4j 写入数据。 -
neo4j_data_science.ipynb
展示了如何将 Pandas(在 PySpark 中)与 Neo4j 图数据科学库结合,以突出银行场景中的欺诈行为。