Spark 结构化流

让我们看看如何将 Spark 结构化流 API 与 Neo4j Apache Spark 连接器结合使用。

尽管连接器相同,但 Spark 流式处理与 Spark 批处理的工作方式不同。以下是有关 Spark 流式处理方法的一些链接:

Neo4j 流式传输选项

表 1. 可用的流式传输选项列表
设置名称 描述 默认值 必需

Sink

checkpointLocation

检查点文件位置(查看更多)。

(无)

Source

streaming.property.name

用于批量读取的时间戳属性。在此阅读更多。

(无)

streaming.from

此选项用于告诉连接器从何处向流发送数据。在此阅读更多。

NOW 从流启动时开始流式传输。

ALL 在读取新数据之前,将数据库中的所有数据发送到流。

NOW

streaming.query.offset

一个有效的 Cypher® READ_ONLY 查询,返回一个长整型值。

(例如,MATCH (p:MyLabel) RETURN MAX(p.timestamp)

这用于获取数据库中给定查询的最新时间戳。更多信息请参阅此处

(无)

是,仅适用于 query 模式

Sink

将流写入 Neo4j 实例非常简单,可以使用三种写入策略中的任何一种来完成。

相同的模式概念也适用于此处。如果使用空结果集启动流式读取,则需要使用用户定义的模式来指定模式,否则批处理读取将失败。
从 Kafka 主题读取并写入 Neo4j 的代码示例。
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder() \
    .master('local[*]') \
    .getOrCreate()

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", "PeopleTopic") \
    .load()

query = df.writeStream \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "neo4j://localhost:7687") \
    .option("save.mode", "ErrorIfExists") \
    .option("checkpointLocation", "/tmp/checkpoint/myCheckPoint") \
    .option("labels", "Person") \
    .option("node.keys", "value") \
    .start()

如前所述,您可以使用任何写入策略:节点关系查询

唯一的区别是您必须设置 checkpointLocationsave.mode 选项。

使用 save.mode,您可以控制数据的写入方式。更多信息请参阅此处

检查点

检查点是一个文件,它允许 Spark 结构化流从故障中恢复。Spark 使用进度信息更新此文件,并在发生故障或查询重新启动时从该点恢复。此检查点位置必须是 HDFS 兼容文件系统中的路径。

由于该主题广泛且复杂,您可以阅读Spark 官方文档

Source

从 Neo4j 读取流需要一些额外的配置。

让我们先看看代码,然后分析所有选项。

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder() \
    .master('local[*]') \
    .getOrCreate()

df = spark.readStream \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "neo4j://localhost:7687") \
    .option("labels", "Person") \
    .option("streaming.property.name", "timestamp") \
    .option("streaming.from", "NOW") \
    .load()
# Memory streaming format writes the streamed data to a SparkSQL table
# NOTE: make sure this code is executed in another block,
# or at least seconds later the previous one to allow the full initialization of the stream.
# The risk is that the query will return an empty result.
query = stream.writeStream \
    .format("memory") \
    .queryName("testReadStream") \
    .start()

spark \
  .sql("select * from testReadStream order by timestamp") \
  .show()

流式传输属性名称

为了使流式传输正常工作,每个记录需要有一个 timestamp 类型的属性,以便在从 Neo4j 读取新数据并发送到流时利用该属性。

在幕后,连接器正在构建一个带 WHERE 子句的查询,该子句检查具有此 [timestampProperty] 的记录,该属性位于根据检查点数据和数据库中可用最新偏移量计算出的时间戳范围内。

因此,每个节点都必须具有 Neo4j 类型(Long)的时间戳属性,并且它必须不为 null

像 "2021-08-11" 这样的字符串类型属性不起作用。它需要是 Neo4j 类型的 Long 值。

属性名称可以是任何内容,但请记住相应地设置 streaming.property.name

流式传输起点选项

您可以决定流式传输数据库中的所有数据,或者只传输新数据。为此,您可以将 streaming.from 选项设置为以下两个值之一:

  • NOW: 从当前时间戳开始读取。这是 streaming.from 选项的默认值

  • ALL: 首先读取数据库中的所有数据,然后只读取新数据。

读取模式

与 Sink 模式一样,您可以使用任何读取策略:节点关系查询

关于 query 模式的注意事项

在使用查询模式时,处理 streaming.fromstreaming.property.name 会稍微不那么自动化。

让我们看看示例,然后解释发生了什么。

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder() \
    .master('local[*]') \
    .getOrCreate()

df = spark.readStream \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "neo4j://localhost:7687") \
    .option("streaming.from", "NOW") \
    .option("streaming.property.name", "timestamp") \
    .option("query", \
        """MATCH (p:Test3_Person)
           WHERE p.timestamp > $stream.from AND p.timestamp <= $stream.to
           RETURN p.age AS age, p.timestamp AS timestamp""") \
    .option("streaming.query.offset", \
        "MATCH (p:Test3_Person) RETURN max(p.timestamp)") \
    .load()

如您所见,无论如何都必须指定 streaming.fromstreaming.property.name,但您需要自行处理 WHERE 子句。查询提供了两个参数:$stream.to$stream.from,它们描述了我们需要读取的更改范围。

尽管查询参数 $stream.offset(现已弃用)仍在查询中受支持,但所有查询都将重写以同时根据 $stream.from$stream.to 参数进行筛选。为了使数据库高效查询更改,请更新您的查询以使用这些新的查询参数。

在这种情况下,streaming.query.offset 选项是强制性的;此选项由连接器用于读取数据库中的最新时间戳,结果用于计算要选择的范围。

其他示例

您可以在包含 Zeppelin 笔记本的此存储库中找到流式代码片段和许多其他示例。

一篇名为从 Kinesis 经 Spark 到 Neo4j 的文章描述了一个使用 Spark、Neo4j 和 AWS Kinesis 的完整示例。

© . All rights reserved.