从 REST API 导入 JSON 数据到 Neo4j
本文演示了将基于 JSON 的 REST API 数据加载到 Neo4j 中的一些技术。
将 JSON 数据导入 Neo4j
有大量的基于 JSON 的 Web API 可以导入到 Neo4j 中,我们可以使用其中一个 加载 JSON 过程从这些 API 检索数据,并将其转换为可供 Cypher® 使用的映射值。
APOC 用户指南提供了一个工作示例,展示了如何 将 StackOverflow 数据导入 Neo4j。
Strava API
Strava 是一款跑步者和骑行者用来记录活动并与朋友分享的应用程序。用户可以通过 基于 JSON 的 REST API 访问这些数据。
在开始调用 API 之前,我们需要 创建一个应用程序。然后我们将获得一个访问令牌,在所有 API 请求中都需要使用它。
我们可以通过运行以下命令在 Neo4j Browser 或 Cypher Shell 中创建一个参数
:params {stravaToken: "Bearer <insert-strava-token>"}
不要忘记将 |
处理分页端点
我们有兴趣导入 已登录运动员 的活动。该端点接受以下参数:

我们感兴趣的是 per_page
(在这里我们可以定义每次调用端点返回的活动数量)和 after
(在这里我们可以告诉 API 只返回给定纪元时间戳之后的活动)。
假设我们在一次 API 请求中可以返回的活动数量超过了限制。我们将需要进行分页以检索所有活动并将它们导入到 Neo4j 中。
在对 API 进行分页之前,我们首先学习如何将一页活动导入 Neo4j。以下查询将从最早的时间戳开始返回活动:
WITH 0 AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
我们为每个活动创建一个带有 Run
标签的节点,并设置一些属性。对于此示例,最有趣的是 startDate
,我们稍后将把它传递给 after
参数。
此查询将加载前 30 个活动,但如果我们想获取接下来的 30 个怎么办?我们可以更改查询的第一行,以查找我们任何 Run
节点的最新的时间戳,然后将其传递给 API。如果没有 Run
节点,我们可以像下面的查询一样使用值 0。
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
我们可以继续手动运行此查询,但现在是时候将其自动化了。
自动化 API 分页
一种方法是使用脚本语言,并在其中创建一个循环,不断调用该端点,直到没有活动可检索为止。如果我们有点创意,我们可以使用 apoc.periodic.commit
过程实现相同的结果。
根据 APOC 文档,这是周期性迭代过程的描述:
在我们的案例中,退出条件是当我们从 API 接收到少于 30 个活动时。让我们首先更新查询,如果返回的活动少于 30 个,则返回值为 0
,如果正好是 30 个,则返回实际计数。
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
现在剩下的就是将整个过程包装在周期性提交中。我们调用 apoc.periodic.commit
方法并带有两个参数:
-
第一个是 Cypher 语句,将一直运行直到
RETURN
子句返回 0, -
第二个是传递给 Cypher 语句的参数。
call apoc.periodic.commit("
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
", {stravaToken: $stravaToken})
此查询将向 API 发送多个提交,直到我们加载了所有活动。