从 REST API 导入 JSON 数据到 Neo4j
本文演示了一些将基于 JSON 的 REST API 数据加载到 Neo4j 的技术。
将 JSON 数据导入 Neo4j
我们可以将大量基于 JSON 的 Web API 导入到 Neo4j 中,并且可以使用其中一个 加载 JSON 过程从这些 API 中检索数据并将其转换为 Cypher® 可以使用的映射值。
APOC 用户指南提供了一个示例,展示了如何 将 StackOverflow 数据导入到 Neo4j。
Strava API
Strava 是跑步者和自行车骑行者用来记录他们的活动并与朋友分享的应用程序。这些数据可通过 基于 JSON 的 REST API 供用户使用。
在开始调用 API 之前,我们需要 创建一个应用程序。然后我们将获得一个访问令牌,我们需要在所有对 API 的请求中使用它。
我们可以通过运行以下命令在 Neo4j 浏览器 或 Cypher Shell 中创建一个参数
:params {stravaToken: "Bearer <insert-strava-token>"}
不要忘记将 |
处理分页端点
我们有兴趣导入 已登录的运动员 的活动。该端点采用以下参数

我们感兴趣的是 per_page
(我们可以在其中定义每次调用端点返回的活动数量)和 after
(我们可以在其中告诉 API 仅返回在提供的纪元时间戳之后的结果)。
假设我们有超过一个请求可以返回的活动。我们需要进行分页以检索所有活动并将其导入到 Neo4j 中。
在进行 API 分页之前,让我们先了解如何将一页活动导入到 Neo4j 中。以下查询将从最早的时间戳开始返回活动
WITH 0 AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
我们为每个活动创建一个带有标签 Run
的节点,并设置一些属性。对于此示例,最有趣的是 startDate
,稍后我们将将其传递给 after
参数。
此查询将加载前 30 个活动,但如果我们想获取接下来的 30 个呢?我们可以更改查询的第一行以查找任何 Run
节点的最新时间戳,然后将其传递给 API。如果没有 Run
节点,则可以使用 0 作为值,如下面的查询所示。
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
我们可以继续手动运行此查询,但现在是时候将其自动化了。
自动 API 分页
一种方法是使用脚本语言并在其中创建一个循环,在循环中我们调用该端点,直到我们用完要检索的活动。如果我们稍微发挥创意,就可以使用 apoc.periodic.commit
过程获得相同的结果。
从 APOC 文档中,这是定期迭代过程的描述
在我们的例子中,退出条件将是我们从 API 收到少于 30 个活动时。让我们首先更新我们的查询,以便在返回少于 30 个活动时返回 0
的值,并在返回 30 个活动时返回实际计数。
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
现在剩下的就是将整个内容包装在 periodic commit 中。我们使用两个参数调用 apoc.periodic.commit
方法
-
第一个是直到
RETURN
子句返回 0 才运行的 Cypher 语句, -
第二个是传递给 Cypher 语句的参数。
call apoc.periodic.commit("
OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after
WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value
CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
run.startDate = datetime(value.start_date_local),
run.elapsedTime = duration({seconds: value.elapsed_time})
RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
", {stravaToken: $stravaToken})
此查询向 API 发送多个提交,直到我们加载了所有活动。