Databricks 快速入门

此页面包含有关使用第三方平台的说明,这些说明可能因我们无法控制的变化而发生更改。如有疑问,请参考第三方平台文档。

先决条件

  • Databricks 工作区必须在类似于 https://dbc-xxxxxxxx-yyyy.cloud.databricks.com 的 URL 上可用。

设置计算集群

  1. 创建一个具有 单用户 访问模式、无限制 策略和您首选 Scala 运行时的计算集群。

    目前不支持共享访问模式。

  2. 集群可用后,打开其页面并选择选项卡。

  3. 选择安装新库,然后选择Maven 作为库源。

  4. 选择搜索包,在 Spark 包中搜索 neo4j-spark-connector,然后选择它。

    确保通过将 Scala 版本与集群的运行时匹配来选择连接器的正确版本。

  5. 选择安装

Unity Catalog

Neo4j 仅在 单用户 访问模式下支持 Unity Catalog。有关更多信息,请参阅 Databricks 文档

会话配置

您可以通过以下步骤在运行笔记本的集群上设置 Spark 配置

  1. 打开集群配置页面。

  2. 配置下选择高级选项切换。

  3. 选择Spark 选项卡。

例如,您可以在文本区域中添加 Neo4j Bearer 身份验证配置,如下所示

Bearer 身份验证示例
neo4j.url neo4j://<host>:<port>
neo4j.authentication.type bearer
neo4j.authentication.bearer.token <token>

Databricks 建议不要在纯文本中存储密码和令牌等秘密信息。一个安全的替代方案是使用 秘密

身份验证方法

支持 Neo4j Java 驱动程序(版本 4.4 及更高版本)支持的所有身份验证方法。

有关身份验证配置的更多详细信息,请参阅 Neo4j 驱动程序选项

设置秘密

您可以使用 Databricks CLI 通过 Secrets API 添加秘密 到您的环境。如果您使用 Databricks 运行时版本 15.0 或更高版本,则可以直接 从笔记本终端 添加秘密。

设置秘密后,您可以使用 Databricks 实用程序 (dbutils) 从 Databricks 笔记本中访问它们。例如,给定 neo4j 范围和用于基本身份验证的 usernamepassword 秘密,您可以在 Python 笔记本中执行以下操作

from pyspark.sql import SparkSession

url = "neo4j+s://xxxxxxxx.databases.neo4j.io"
username = dbutils.secrets.get(scope="neo4j", key="username")
password = dbutils.secrets.get(scope="neo4j", key="password")
dbname = "neo4j"

spark = (
    SparkSession.builder.config("neo4j.url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()
)

Delta 表

您可以使用 Spark 连接器从 Databricks 笔记本中读取和写入 Delta 表。这不需要任何额外设置。

基本往返

以下示例演示如何读取 Delta 表,将其作为节点和节点属性写入 Neo4j,从 Neo4j 读取相应的节点和节点属性,并将它们写入新的 Delta 表。

Delta 表的内容

该示例假设存在一个名为 users_example 的 Delta 表,它包含以下数据

表 1. users_example
name surname age

John

Doe

42

Jane

Doe

40

# Read the Delta table
tableDF = spark.read.table("users_example")

# Write the DataFrame to Neo4j as nodes
(
    tableDF
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":User")
    .save()
)

# Read the nodes with `:User` label from Neo4j
neoDF = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":User")
    .load()
)

# Write the DataFrame to another Delta table,
# which will contain the additional columns
# `<id>` and `<labels>`
neoDF.write.saveAsTable("users_new_example")

Delta 表到 Neo4j 节点和关系

为了避免死锁,在将关系写入 Neo4j 之前,始终使用单个分区(使用 coalesce(1)repartition(1))。

以下示例演示如何读取 Delta 表并将它的数据作为节点和关系写入 Neo4j。有关使用 Overwrite 模式和仅写入节点的详细信息,请参阅 写入 页面。

Delta 表的内容

该示例假设存在一个名为 customers_products_example 的 Delta 表,它包含以下数据

表 2. customers_products_example
name surname customerID product quantity order

John

Doe

1

Product 1

200

ABC100

Jane

Doe

2

Product 2

100

ABC200

# Read the Delta table into a DataFrame
relDF = spark.read.table("customers_products_example")

# Write the table to Neo4j using the
# `relationship` write option
(
    relDF
    # Use a single partition
    .coalesce(1)
    .write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map DataFrame columns to source node properties
    .option("relationship.source.node.properties", "name,surname,customerID:id")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map DataFrame columns to target node properties
    .option("relationship.target.node.properties", "product:name")
    # Map DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)

Neo4j 节点到 Delta 表

以下示例演示如何从 Neo4j 读取节点并将其写入 Delta 表。有关读取关系的详细信息,请参阅 读取 页面。

# Read the nodes with `:Customer` label from Neo4j
df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":Customer")
    .load()
)

# Write the DataFrame to another Delta table
df.write.saveAsTable("customers_status_example")