Databricks 快速入门

本页面包含有关第三方平台使用方法的说明,这些说明可能会发生超出我们控制的更改。如有疑问,请参阅第三方平台文档。

先决条件

  • Databricks 工作区必须通过类似 https://dbc-xxxxxxxx-yyyy.cloud.databricks.com 的 URL 可用。

设置计算集群

  1. 创建一个计算集群,其访问模式为 Single user,策略为 Unrestricted,并选择您偏好的 Scala 运行时。

    目前不支持共享访问模式。

  2. 集群可用后,打开其页面并选择 Libraries 选项卡。

  3. 选择 Install new,并选择 Maven 作为库源。

  4. 选择 Search Packages,然后从 neo4j 组织(Spark Packages)搜索 neo4j-spark-connector,或者从 org.neo4j 组 ID 搜索 neo4j-connector-apache-spark (Maven Central),然后选择最新版本。

    请确保通过将 Scala 版本与集群运行时匹配来选择正确版本的连接器。

  5. 选择 Install

Unity Catalog

Neo4j 仅在 Single user 访问模式下支持 Unity Catalog。有关更多信息,请参阅 Databricks 文档

会话配置

您可以通过以下方式在运行笔记本的集群上设置 Spark 配置:

  1. 打开集群配置页面。

  2. Configuration 下选择 Advanced Options 开关。

  3. 选择 Spark 选项卡。

例如,您可以在文本区域中添加 Neo4j 持有者认证配置,如下所示:

持有者认证示例
neo4j.url neo4j://<host>:<port>
neo4j.authentication.type bearer
neo4j.authentication.bearer.token <token>

Databricks 建议不要以纯文本形式存储密码和令牌等机密。一种安全的替代方法是使用密钥

认证方法

支持 Neo4j Java 驱动(4.4 及更高版本)支持的所有认证方法。

有关认证配置的更多详细信息,请参阅Neo4j 驱动选项

设置密钥

您可以使用 Databricks CLI 通过 Secrets API 向您的环境添加密钥。如果您使用 Databricks 运行时版本 15.0 或更高版本,您可以直接从笔记本终端添加密钥。

设置密钥后,您可以使用 Databricks 实用程序 (dbutils) 从 Databricks 笔记本访问它们。例如,给定一个 neo4j 范围以及用于基本认证的 usernamepassword 密钥,您可以在 Python 笔记本中执行以下操作:

from pyspark.sql import SparkSession

url = "neo4j+s://xxxxxxxx.databases.neo4j.io"
username = dbutils.secrets.get(scope="neo4j", key="username")
password = dbutils.secrets.get(scope="neo4j", key="password")
dbname = "neo4j"

spark = (
    SparkSession.builder.config("neo4j.url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()
)

Delta 表

您可以使用 Spark 连接器从 Databricks 笔记本读取和写入 Delta 表。这不需要任何额外设置。

基本往返

以下示例展示了如何读取 Delta 表,将其作为节点和节点属性写入 Neo4j,从 Neo4j 读取相应的节点和节点属性,然后将它们写入新的 Delta 表。

Delta 表内容

本示例假设存在一个名为 users_example 的 Delta 表,并包含以下数据:

表 1. users_example
姓名 姓氏 年龄

John

Doe

42

Jane

Doe

40

# Read the Delta table
tableDF = spark.read.table("users_example")

# Write the DataFrame to Neo4j as nodes
(
    tableDF
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":User")
    .save()
)

# Read the nodes with `:User` label from Neo4j
neoDF = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":User")
    .load()
)

# Write the DataFrame to another Delta table,
# which will contain the additional columns
# `<id>` and `<labels>`
neoDF.write.saveAsTable("users_new_example")

Delta 表到 Neo4j 节点和关系

为避免死锁,在将关系写入 Neo4j 之前,请始终使用单个分区(使用 coalesce(1)repartition(1))。

以下示例展示了如何读取 Delta 表并将其数据作为节点和关系写入 Neo4j。有关使用 Overwrite 模式和仅写入节点的详细信息,请参阅写入页面。

Delta 表内容

本示例假设存在一个名为 customers_products_example 的 Delta 表,并包含以下数据:

表 2. customers_products_example
姓名 姓氏 客户 ID 产品 数量 订单

John

Doe

1

产品 1

200

ABC100

Jane

Doe

2

产品 2

100

ABC200

# Read the Delta table into a DataFrame
relDF = spark.read.table("customers_products_example")

# Write the table to Neo4j using the
# `relationship` write option
(
    relDF
    # Use a single partition
    .coalesce(1)
    .write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map DataFrame columns to source node properties
    .option("relationship.source.node.properties", "name,surname,customerID:id")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map DataFrame columns to target node properties
    .option("relationship.target.node.properties", "product:name")
    # Map DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)

Neo4j 节点到 Delta 表

以下示例展示了如何从 Neo4j 读取节点并将其写入 Delta 表。有关读取关系的详细信息,请参阅读取页面。

# Read the nodes with `:Customer` label from Neo4j
df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":Customer")
    .load()
)

# Write the DataFrame to another Delta table
df.write.saveAsTable("customers_status_example")
© . All rights reserved.