Spark 结构化流
让我们看看如何利用 Spark 结构化流 API 和 Neo4j Apache Spark 连接器。
虽然连接器相同,但 Spark 流处理与 Spark 批处理的工作方式不同。以下是一些了解 Spark 流处理方法的链接
-
深入了解 Apache Spark Streaming 的执行模型,由 Databricks 提供。
Neo4j 流处理选项
设置名称 | 描述 | 默认值 | 是否必填 |
---|---|---|---|
接收器 |
|||
|
检查点文件位置 (了解更多). |
(无) |
是 |
源 |
|||
|
用于批处理读取的时间戳属性。阅读更多 信息。 |
(无) |
是 |
|
此选项用于告诉连接器从哪里将数据发送到流。阅读更多 信息。
|
|
是 |
|
返回长整型值的有效 Cypher® READ_ONLY 查询。 (例如, 这用于获取给定查询的数据库中的最后一个时间戳。更多信息请参考 此处。 |
(无) |
是,仅适用于 |
接收器
将流写入 Neo4j 实例非常简单,可以使用任何 三种写入策略。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("subscribe", "PeopleTopic") \
.load()
query = df.writeStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://127.0.0.1:7687") \
.option("save.mode", "ErrorIfExists") \
.option("checkpointLocation", "/tmp/checkpoint/myCheckPoint") \
.option("labels", "Person") \
.option("node.keys", "value") \
.start()
唯一的区别是您必须设置 checkpointLocation
和 save.mode
选项。
使用 save.mode
,您可以控制数据写入的方式。更多信息请参考 此处。
检查点
检查点是一个文件,它允许 Spark 结构化流从故障中恢复。Spark 使用此文件更新进度信息,并在发生故障或查询重新启动时从该点恢复。此检查点位置必须是 HDFS 兼容文件系统中的路径。
由于主题广泛且复杂,您可以阅读 Spark 官方文档。
源
从 Neo4j 读取流需要一些额外的配置。
让我们先看看代码,然后再分析所有选项。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://127.0.0.1:7687") \
.option("labels", "Person") \
.option("streaming.property.name", "timestamp") \
.option("streaming.from", "NOW") \
.load()
# Memory streaming format writes the streamed data to a SparkSQL table
# NOTE: make sure this code is executed in another block,
# or at least seconds later the previous one to allow the full initialization of the stream.
# The risk is that the query will return an empty result.
query = stream.writeStream \
.format("memory") \
.queryName("testReadStream") \
.start()
spark \
.sql("select * from testReadStream order by timestamp") \
.show()
流处理属性名称
为了使流处理工作,您需要每个记录都具有 timestamp
类型的属性,以便在从 Neo4j 读取新数据以发送到流时加以利用。
在幕后,连接器正在构建一个包含 WHERE
子句的查询,该子句检查其 [timestampProperty]
在从检查点数据计算出的时间戳范围和数据库中可用的最新偏移量之间记录。
因此,每个节点都必须具有 Neo4j 类型(Long)的时间戳属性,并且必须不为 null
。
像“2021-08-11”这样的字符串类型属性不起作用。它需要是 Neo4j 类型的长整型。 |
属性名称可以是任何名称,只需记住相应地设置 streaming.property.name
即可。
流处理来源选项
您可以选择流处理数据库中的所有数据,或仅流处理新数据。为此,您可以将 streaming.from
选项设置为以下两个值之一
-
NOW
:从当前时间戳开始读取。这是streaming.from
选项的默认值。 -
ALL
:首先读取数据库中的所有数据,然后仅读取新数据。
读取模式
关于 query
模式的说明
在使用查询模式时,处理 streaming.from
和 streaming.property.name
的自动化程度较低。
让我们看看示例,然后解释正在发生的事情。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://127.0.0.1:7687") \
.option("streaming.from", "NOW") \
.option("streaming.property.name", "timestamp") \
.option("query", \
"""MATCH (p:Test3_Person)
WHERE p.timestamp > $stream.from AND p.timestamp <= $stream.to
RETURN p.age AS age, p.timestamp AS timestamp""") \
.option("streaming.query.offset", \
"MATCH (p:Test3_Person) RETURN max(p.timestamp)") \
.load()
如您所见,无论如何都必须指定 streaming.from
和 streaming.property.name
,但您需要自己处理 WHERE
子句。您将向查询提供两个参数,$stream.to
和 $stream.from
,它们描述了我们需要读取的更改范围。
尽管查询参数 |
在这种情况下,streaming.query.offset
选项是必需的;连接器使用此选项读取数据库中的最后一个时间戳,结果用于计算要选择的范围。
其他示例
您可以在 此存储库 中找到流处理代码片段和许多其他示例,其中包含 Zeppelin 笔记本。
本文档描述了一个使用 Spark、Neo4j 和 AWS Kinesis 的完整示例:从 Kinesis 通过 Spark 到 Neo4j。