从 REST API 导入 JSON 数据到 Neo4j

本文演示了将基于 JSON 的 REST API 数据加载到 Neo4j 中的一些技术。

将 JSON 数据导入 Neo4j

有大量的基于 JSON 的 Web API 可以导入到 Neo4j 中,我们可以使用其中一个 加载 JSON 过程从这些 API 检索数据,并将其转换为可供 Cypher® 使用的映射值。

APOC 用户指南提供了一个工作示例,展示了如何 将 StackOverflow 数据导入 Neo4j

Strava API

Strava 是一款跑步者和骑行者用来记录活动并与朋友分享的应用程序。用户可以通过 基于 JSON 的 REST API 访问这些数据。

在开始调用 API 之前,我们需要 创建一个应用程序。然后我们将获得一个访问令牌,在所有 API 请求中都需要使用它。

我们可以通过运行以下命令在 Neo4j Browser 或 Cypher Shell 中创建一个参数

:params {stravaToken: "Bearer <insert-strava-token>"}

不要忘记将 <insert-strava-token> 替换为您 Strava 应用程序的令牌。

处理分页端点

我们有兴趣导入 已登录运动员 的活动。该端点接受以下参数:

我们感兴趣的是 per_page(在这里我们可以定义每次调用端点返回的活动数量)和 after(在这里我们可以告诉 API 只返回给定纪元时间戳之后的活动)。

假设我们在一次 API 请求中可以返回的活动数量超过了限制。我们将需要进行分页以检索所有活动并将它们导入到 Neo4j 中。

在对 API 进行分页之前,我们首先学习如何将一页活动导入 Neo4j。以下查询将从最早的时间戳开始返回活动:

WITH 0 AS after

WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value

CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
    run.startDate = datetime(value.start_date_local),
    run.elapsedTime = duration({seconds: value.elapsed_time})

我们为每个活动创建一个带有 Run 标签的节点,并设置一些属性。对于此示例,最有趣的是 startDate,我们稍后将把它传递给 after 参数。

此查询将加载前 30 个活动,但如果我们想获取接下来的 30 个怎么办?我们可以更改查询的第一行,以查找我们任何 Run 节点的最新的时间戳,然后将其传递给 API。如果没有 Run 节点,我们可以像下面的查询一样使用值 0。

OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after

WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value

CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
    run.startDate = datetime(value.start_date_local),
    run.elapsedTime = duration({seconds: value.elapsed_time})

我们可以继续手动运行此查询,但现在是时候将其自动化了。

自动化 API 分页

一种方法是使用脚本语言,并在其中创建一个循环,不断调用该端点,直到没有活动可检索为止。如果我们有点创意,我们可以使用 apoc.periodic.commit 过程实现相同的结果。

根据 APOC 文档,这是周期性迭代过程的描述:

它对于在单独的事务中重复运行查询直到不再处理并生成任何结果非常有用。因此,您可以分批迭代不满足条件的元素并更新它们,以便它们之后满足条件。

在我们的案例中,退出条件是当我们从 API 接收到少于 30 个活动时。让我们首先更新查询,如果返回的活动少于 30 个,则返回值为 0,如果正好是 30 个,则返回实际计数。

OPTIONAL MATCH (run:Run)
WITH run ORDER BY run.startDate DESC LIMIT 1
WITH coalesce(run.startDate.epochSeconds, 0) AS after

WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
YIELD value

CREATE (run:Run {id: value.id})
SET run.distance = toFloat(value.distance),
    run.startDate = datetime(value.start_date_local),
    run.elapsedTime = duration({seconds: value.elapsed_time})

RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count

现在剩下的就是将整个过程包装在周期性提交中。我们调用 apoc.periodic.commit 方法并带有两个参数:

  • 第一个是 Cypher 语句,将一直运行直到 RETURN 子句返回 0,

  • 第二个是传递给 Cypher 语句的参数。

call apoc.periodic.commit("
  OPTIONAL MATCH (run:Run)
  WITH run ORDER BY run.startDate DESC LIMIT 1
  WITH coalesce(run.startDate.epochSeconds, 0) AS after

  WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
  CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
  YIELD value

  CREATE (run:Run {id: value.id})
  SET run.distance = toFloat(value.distance),
      run.startDate = datetime(value.start_date_local),
      run.elapsedTime = duration({seconds: value.elapsed_time})

  RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
", {stravaToken: $stravaToken})

此查询将向 API 发送多个提交,直到我们加载了所有活动。

© . All rights reserved.