创建作业规范文件

作业配置文件指示 Dataflow 如何运行导入(从哪里获取数据,如何将其映射到 Neo4j 等)。它由包含四个部分的 JSON 对象组成。

作业规范 JSON 骨架
{
  "config": { ... },  (1)
  "sources": [  (2)
    { ... }
  ],
  "targets": [  (3)
    { ... }
  ],
  "actions": [  (4)
    { ... }
  ]
}
1 config - 影响导入执行方式的全局标志(可选)
2 sources - 数据源定义(关系型)
3 targets - 数据目标定义(图:节点/关系/Cypher 查询)
4 actions - 一次性操作(可选)

在高级别上,作业从 sources 中获取数据,并将它们转换/导入到 targets 中。

完整示例

下面是一个开箱即用的作业规范文件示例,用于导入公开可用的 movies 数据集。

该数据集包含实体 PersonMovie,通过 DIRECTEDACTED_IN 关系链接在一起。换句话说,每个 Person 可能会 DIRECTED 和/或 ACTED_IN 一个 Movie。实体和关系都附加了额外的详细信息。数据来自以下文件:persons.csvmovies.csvacted_in.csvdirected.csv

image$movies model

接下来的部分将对其进行分解,并为每个部分提供上下文信息。我们建议您在阅读作业规范示例的同时阅读本指南。

{
  "version": "1",
  "config": {
    "reset_db": true
  },
  "sources": [
    {
      "type": "text",
      "name": "persons",
      "urls": ["gs://neo4j-examples/persons.csv"],
      "format": "excel",
      "header": ["person_tmdbId","bio","born","bornIn","died","person_imdbId","name","person_poster","person_url"]
    },
    {
      "type": "text",
      "name": "movies",
      "urls": ["gs://neo4j-examples/movies.csv"],
      "format": "excel",
      "header": ["movieId","title","budget","countries","movie_imdbId","imdbRating","imdbVotes","languages","plot","movie_poster","released","revenue","runtime","movie_tmdbId","movie_url","year","genres"]
    },
    {
      "type": "text",
      "name": "directed",
      "urls": ["gs://neo4j-examples/directed.csv"],
      "format": "excel",
      "header": ["movieId","person_tmdbId"]
    },
    {
      "type": "text",
      "name": "acted_in",
      "urls": ["gs://neo4j-examples/acted_in.csv"],
      "format": "excel",
      "header": ["movieId","person_tmdbId","role"]
    }
  ],
  "targets": {
    "nodes": [
      {
        "source": "persons",
        "name": "Persons",
        "write_mode": "merge",
        "labels": [ "Person" ],
        "properties": [
          {
            "source_field": "person_tmdbId",
            "target_property": "id",
            "target_property_type": "string"
          },
          {
            "source_field": "name",
            "target_property": "name",
            "target_property_type": "string"
          },
          {
            "source_field": "bornIn",
            "target_property": "bornLocation",
            "target_property_type": "string"
          },
          {
            "source_field": "born",
            "target_property": "bornDate",
            "target_property_type": "date"
          },
          {
            "source_field": "died",
            "target_property": "diedDate",
            "target_property_type": "date"
          }
        ],
        "schema": {
          "key_constraints": [
            {
              "name": "personIdKey",
              "label": "Person",
              "properties": ["id"]
            }
          ],
          "unique_constraints": [
            {
              "name": "personNameUnique",
              "label": "Person",
              "properties": ["name"]
            }
          ]
        }
      },
      {
        "source": "movies",
        "name": "Movies",
        "write_mode": "merge",
        "labels": [ "Movie" ],
        "properties": [
          {
            "source_field": "movieId",
            "target_property": "id",
            "target_property_type": "string"
          },
          {
            "source_field": "title",
            "target_property": "title",
            "target_property_type": "string"
          },
          {
            "source_field": "year",
            "target_property": "releaseYear",
            "target_property_type": "string"
          },
          {
            "source_field": "imdbRating",
            "target_property": "imdbRating",
            "target_property_type": "float"
          }
        ],
        "schema": {
          "key_constraints": [
            {
              "name": "movieIdKey",
              "label": "Movie",
              "properties": ["id"]
            }
          ],
          "unique_constraints": [
            {
              "name": "movieTitleUnique",
              "label": "Movie",
              "properties": ["title"]
            }
          ]
        }
      }
    ],
    "relationships": [
      {
        "source": "directed",
        "name": "Directed",
        "type": "DIRECTED",
        "write_mode": "merge",
        "node_match_mode": "match",
        "start_node_reference": "Persons",
        "end_node_reference": "Movies"
      },
      {
        "source": "acted_in",
        "name": "Acted_in",
        "type": "ACTED_IN",
        "write_mode": "merge",
        "node_match_mode": "match",
        "start_node_reference": "Persons",
        "end_node_reference": "Movies",
        "properties": [
          {
            "source_field": "role",
            "target_property": "role",
            "target_property_type": "string"
          }
        ]
      }
    ]
  }
}

配置

config 对象包含导入作业的全局配置。所有设置都有默认值,因此除非您希望更改它们,否则无需指定它们。

配置设置及其默认值
"config": {
  "reset_db": false,
  "index_all_properties": false,
  "node_target_batch_size": 5000,
  "relationship_target_batch_size": 1000,
  "query_target_batch_size": 1000,
  "node_target_parallelism": 10,
  "relationship_target_parallelism": 1,
  "query_target_parallelism": 1
}
  • reset_db (bool) - 是否在导入前清除目标数据库。删除数据、索引和约束。

  • index_all_properties (bool) - 是否为所有属性创建索引。请参阅Cypher® → 搜索性能索引

  • node_target_batch_size (int) - 每个事务要处理的节点数量。

  • relationship_target_batch_size (int) - 每个事务要处理的关系数量。

  • query_target_batch_size (int) - 每个自定义查询要处理的行数。

  • node_target_parallelism (int) - 每个工作程序节点目标的最大并发事务数。

  • relationship_target_parallelism (int) - 每个工作程序关系目标的最大并发事务数。应谨慎设置大于 1 的值,因为它们可能导致死锁。

  • query_target_parallelism (int) - 每个工作程序 Cypher 查询目标的最大并发事务数。应谨慎设置大于 1 的值,因为它们可能导致死锁。

sources 部分包含数据源的定义,作为一个列表。作为粗略指南,您可以将 一个表 <=> 一个源 视为等价。导入程序将利用源提供的数据,并将其提供给目标,目标最终将其映射到 Neo4j 中。

源对象至少必须指定属性 typenameurlsheader。根据指定的 format,默认的列分隔符和行分隔符将根据Apache 的 CSVFormat 进行设置。

源对象规范及其默认值
{
  "type": "text",
  "name": "<sourceName>",
  "urls": [ "<csvPath1>", "<csvPath2>", ... ],
  "format": "default",
  "column_delimiter": "",
  "line_separator": "",
  "header": "<colName1>,<colName2>,..."
}
  • type (string) - text

  • name (string) - 源的易于理解的标签(在所有名称中唯一)。您将使用此标签从规范文件的其他部分引用该源。

  • urls (字符串列表) - CSV 文件的 Google Cloud Storage 位置(例如 gs://neo4j-datasets/movies.csv)。

    如何检索文件的 Google Cloud Storage 位置?

    要检索 Cloud 存储桶中文件的 Google Cloud Storage 位置,请通过右侧的三个点展开文件选项,然后选择 复制 gsutil URI

    image$google cloud file url
  • format (string) - 提供的 CSV 文件的格式。
    有效值为:defaultexcelinformixmongomongo_tsvmysqloraclepostgrespostgresql_csvrfc4180
    格式的行为符合Apache 的 CSVFormat

  • column_delimiter (string) - CSV 字段分隔符。

  • line_separator (string) - CSV 行分隔符。

  • header (string) - CSV 文件包含的所有字段名称的完整列表,按顺序排列。或者,该列表可以仅限于前几列。此处指定的列名控制目标将从中映射的行字段名称。

字段 header 必须指定 CSV 包含的所有列,或从第一列开始的它们的一个连续子集。**无法指定列的任意子集。**

给定一个包含列 ID,name,title,rating 的 CSV 的有效/无效 header 示例

有效

ID,name,title,rating

有效

ID,name

有效

ID,name,title

无效

ID,rating

无效

title,rating

示例

persons.csv 文件导入行的源对象示例
{
  "type": "text",
  "name": "persons",
  "urls": "gs://neo4j-examples/persons.csv",
  "format": "excel",
  "header": "person_tmdbId,bio,born,bornIn,died,person_imdbId,name,person_poster,person_url"
}

目标

targets 部分包含导入后将产生的图实体的定义。

Neo4j 使用节点(例如 moviespeople)表示对象,并使用关系(例如 ACTED_INDIRECTED)将它们连接起来。targets 部分中的每个对象都将生成一个对应的实体(节点或关系)在 Neo4j 中,并从源中提取数据。还可以运行自定义 Cypher 查询。

目标规范骨架
"targets": {
  "nodes": [ ... ],
  "relationships": [ ... ],
  "queries": [ ... ]
}

默认情况下,**您无需考虑节点和关系之间的依赖关系**。关系目标始终在对应于其起始和结束节点的目标之后处理。但是,可以将其他目标添加为依赖项。

节点对象

节点实体必须在 targets 对象内以键 nodes 分组到一个列表中。

节点目标规范骨架
"targets": {
  "nodes": [
    { <nodeSpec1> },
    { <nodeSpec2> },
    ...
  ]
}

必填字段

每个节点对象至少必须具有属性 sourcenamelabelsproperties

{
  "source": "<sourceName>",
  "name": "<targetName>",
  "labels": ["<label1>", "<label2>", ...],
  "properties": [
    {
      "source_field": "<bigQueryColumnName>",
      "target_field": "<neo4jPropertyName>",
      "target_property_type": "<neo4jPropertyType>"
    },
    { <propertyObj2> },
    ...
  ],
  "write_mode": "merge"
}
  • source (string) - 此目标应从中提取数据的源的名称。应与 sources 对象中的名称之一匹配。

  • name (string) - 目标的易于理解的名称(在所有名称中唯一)。

  • labels (字符串列表) - 用于标记节点的标签

  • properties (对象列表) - 源列和节点属性之间的映射。
    target_property_type 的有效值为:booleanbyte_array(假设为 base64 编码)、datedurationfloatintegerlocal_datelocal_datetimelocal_timepointstringzoned_datetimezoned_time

  • write_mode (string) - Neo4j 中的创建模式。createmerge。有关 Cypher 子句行为的信息,请参阅CREATEMERGE

模式定义

您可以通过schema对象在导入的节点上创建索引约束。Schema设置等同于手动运行相关的CREATE INDEX/CONSTRAINT命令,只是它们在每个实体类型导入之前自动运行。

如果全局配置index_all_properties设置为true,则所有属性都将使用范围索引进行索引。
节点目标Schema定义及其默认值
{
  ...
  "schema": {
    "enable_type_constraints": true,
    "key_constraints": [
      {
        "name": "<constraintName>",
        "label": "<label>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "unique_constraints": [
      {
        "name": "<constraintName>",
        "label": "<label>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "existence_constraints": [
      {
        "name": "<constraintName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>"
      }
    ],
    "range_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
      }
    ],
    "text_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "point_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "fulltext_indexes": [
      {
        "name": "<indexName>",
        "labels": ["label1", "label2", ...],
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "vector_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ]
  }
}

每个对象的属性位于此处

  • name (字符串) — 在Neo4j中创建的索引或约束的名称。

  • label (字符串)或labels (字符串列表) — 应在其上实施索引或约束的标签。

  • property (字符串)或properties (字符串列表) — 应在其上实施索引或约束的属性。

  • options (对象) — 创建索引或约束时使用的选项(请参阅每个索引约束类型的单独页面)。如果存在,则为可选,但向量索引除外,向量索引是必需的。

源数据对于key_constraints列不能有空值,否则它们将与节点键约束冲突。如果源数据在这方面不干净,请考虑在相关的source.query字段中预先对其进行清理,方法是排除所有不满足约束的行(例如WHERE person_tmbdId IS NOT NULL)。或者,在源转换中使用where属性。
选项key_constraintsunique_constraints需要Neo4j/Aura企业版,在针对Neo4j社区版安装运行时没有任何效果。

配置

节点目标配置选项及其默认值
{
  ...
  "active": true,
  "source_transformations": {
    "enable_grouping": true
  },
  "depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
  • active (布尔值) — 目标是否应包含在导入中(默认值:true)。

  • source_transformations (对象) — 如果enable_grouping设置为true,则导入将在key_constraintsproperties中指定的所有字段上附加SQL子句GROUP BY。如果设置为false,则源中的任何重复数据都将推送到Neo4j,这可能会导致约束错误或使插入效率降低。该对象还可以包含聚合函数和更多字段,请参阅源转换

  • depends_on (字符串列表) — 应在当前目标之前执行的目标的name

示例

导入Person节点的节点对象示例
{
  "source": "persons",
  "name": "Persons",
  "labels": [ "Person" ],
  "properties": [
    {
      "source_field": "person_tmdbId",
      "target_field": "id",
      "target_property_type": "string"
    },
    {
      "source_field": "name",
      "target_field": "name",
      "target_property_type": "string"
    },
    {
      "source_field": "bornIn",
      "target_field": "bornLocation",
      "target_property_type": "string"
    },
    {
      "source_field": "born",
      "target_field": "bornDate",
      "target_property_type": "local_date"
    },
    {
      "source_field": "died",
      "target_field": "diedDate",
      "target_property_type": "local_date"
    }
  ],
  "schema": {
    "key_constraints": [
      {
        "name": "personIdKey",
        "label": "Person",
        "properties": ["id"]
      }
    ],
    "unique_constraints": [
      {
        "name": "personNameUnique",
        "label": "Person",
        "properties": ["name"]
      }
    ]
  }
}

关系对象

关系实体必须在targets对象内的键为relationships的列表中进行分组。

关系目标规范框架
"targets": {
  ...
  "relationships": [
    { <relationshipSpec1> },
    { <relationshipSpec2> },
    ...
  ]
}

必填字段

每个关系对象至少必须具有属性sourcenametype

它还必须包含有关关系将哪些节点目标连接在一起的信息。您可以通过start_node_referenceend_node_reference提供此信息。

{
  "source": "<sourceName>",
  "name": "<targetName>",
  "type": "<relationshipType>",
  "start_node_reference": "<nodeTargetName>",
  "end_node_reference": "<nodeTargetName>",
  "node_match_mode": "<match/merge>",
  "write_mode": "<create/merge>"
}
  • source (string) - 此目标应从中提取数据的源的名称。应与 sources 对象中的名称之一匹配。

  • name (string) - 目标的易于理解的名称(在所有名称中唯一)。

  • type (字符串) — 分配给关系的类型

  • start_node_reference (字符串) — 充当关系起点的节点目标的名称。

  • end_node_reference (字符串) — 充当关系终点的节点目标的名称。

  • node_match_mode (字符串) — 在它们之间创建关系之前,用于获取源/目标节点的Cypher子句。有效值为matchmerge,分别导致Cypher子句MATCHMERGE

  • write_mode (string) - Neo4j 中的创建模式。createmerge。有关 Cypher 子句行为的信息,请参阅CREATEMERGE

keysuniquemandatory选项需要Aura或Neo4j企业版,在针对Neo4j社区版安装运行时将没有任何效果。

属性

关系还可以将源列映射为属性。

{
  ...
  "properties": [
    {
      "source_field": "<bigQueryColumnName>",
      "target_field": "<neo4jPropertyName>",
      "target_property_type": "<neo4jPropertyType>"
    },
    { <propertyObj2> },
    ...
  ]
}
  • properties (对象列表) — 源列和关系属性之间的映射。
    target_property_type 的有效值为:booleanbyte_array(假设为 base64 编码)、datedurationfloatintegerlocal_datelocal_datetimelocal_timepointstringzoned_datetimezoned_time

Schema定义

您可以通过schema对象在导入的关系上创建索引约束。Schema设置等同于手动运行相关的CREATE INDEX/CONSTRAINT命令,只是它们在每个关系类型导入之前自动运行。

如果全局配置index_all_properties设置为true,则所有属性都将使用范围索引进行索引。
关系目标Schema定义及其默认值
{
  ...
  "schema": {
    "enable_type_constraints": true,
    "key_constraints": [
      {
        "name": "<constraintName>",
        "type": "<relationshipType>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "unique_constraints": [
      {
        "name": "<constraintName>",
        "type": "<relationshipType>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "existence_constraints": [
      {
        "name": "<constraintName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>"
      }
    ],
    "range_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
      }
    ],
    "text_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "point_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "fulltext_indexes": [
      {
        "name": "<indexName>",
        "types": ["<relationshipType1>", "<relationshipType2>", ...],
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "vector_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ]
  }
}

每个对象的属性位于此处

  • name (字符串) — 在Neo4j中创建的索引或约束的名称。

  • type (字符串) — 应在其上实施索引或约束的类型。

  • property (字符串)或properties (字符串列表) — 应在其上实施索引或约束的属性。

  • options (对象) — 创建索引或约束时使用的选项(请参阅每个索引约束类型的单独页面)。如果存在,则为可选,但向量索引除外,向量索引是必需的。

源数据对于key_constraints列不能有空值,否则它们将与关系键约束冲突。如果源数据在这方面不干净,请考虑在相关的source.query字段中预先对其进行清理,方法是排除所有不满足约束的行(例如WHERE person_tmbdId IS NOT NULL)。或者,在源转换中使用where属性。
选项key_constraintsunique_constraints需要Neo4j/Aura企业版,在针对Neo4j社区版安装运行时没有任何效果。

配置

关系目标配置选项及其默认值
{
  ...
  "active": true,
  "source_transformations": {
    "enable_grouping": true
  },
  "depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
  • active (布尔值) — 目标是否应包含在导入中。

  • source_transformations (对象) — 如果enable_grouping设置为true,则导入将在key_constraintsproperties中指定的所有字段上使用SQL GROUP BY。如果设置为false,则源中的任何重复数据都将推送到Neo4j,这可能会导致约束错误或使插入效率降低。该对象还可以包含聚合函数和更多字段,请参阅源转换

  • depends_on (字符串列表) — 应在当前目标之前执行的目标的name

示例

导入ACTED_IN关系的关系对象示例
{
  "source": "acted_in",
  "name": "Acted_in",
  "type": "ACTED_IN",
  "write_mode": "merge",
  "node_match_mode": "match",
  "start_node_reference": "Persons",
  "end_node_reference": "Movies",
  "properties": [
    {
      "source_field": "role",
      "target_field": "role",
      "target_property_type": "string"
    }
  ]
}

自定义查询目标

当导入需要一个不适合节点/关系目标格式的复杂查询时,自定义查询目标非常有用。查询目标通过变量$rows接收批处理的行。

自定义查询必须在targets对象内的键为queries的列表中进行分组。

查询目标规范框架
"targets": {
  ...
  "queries": [
    { <querySpec1> },
    { <querySpec2> },
    ...
  ]
}
不要使用自定义查询来运行与源无关的Cypher;请改用操作。一次性查询,尤其是如果是非幂等的,不适合在自定义查询目标中使用。这样做的原因是,来自目标的查询是分批运行的,因此自定义查询可能会根据从源提取的$rows批次的数量运行多次。

必填字段

每个查询目标至少必须具有属性sourcenamequery

{
  "source": "<sourceName>",
  "name": "<targetName>",
  "query": "<cypherQuery>"
}
  • source (string) - 此目标应从中提取数据的源的名称。应与 sources 对象中的名称之一匹配。

  • name (string) - 目标的易于理解的名称(在所有名称中唯一)。

  • query (字符串) — Cypher查询。来自源的数据在参数$rows中可用,表示为列表。

配置

查询目标配置选项及其默认值
{
  ...
  "active": true,
  "depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
  • active (布尔值) — 目标是否应包含在导入中。

  • depends_on (字符串列表) — 应在当前目标之前执行的目标的name

示例

导入Person节点并在创建时设置日期的查询对象示例
{
  "custom_query": {
    "name": "Person nodes",
    "source": "persons",
    "query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
  }
}

源转换

每个节点和关系目标可以选择具有一个source_transformation属性,该属性包含聚合函数。这对于从更细粒度的源中提取更高级别的维度非常有用。聚合会导致额外的字段,这些字段可用于属性映射。

"source_transformations": {
  "enable_grouping": true,
  "aggregations": [ {
    "expression": "",
    "field_name": ""
   },
   { aggregationObj2 }, ...
  ],
  "limit": -1,
  "where": "",
  "order_by": [
    {
      "expression": "column_name",
      "order": "<asc/desc>"
    },
    { orderObj2 }, ...
  ],
}
  • enable_grouping (布尔值) — 必须为true才能使aggregations/where工作。

  • aggregations (对象列表) — 聚合在expression属性中指定为SQL查询,结果在field_name中指定的名称下作为源列可用。

  • limit (整数) — 限制要考虑导入的源行数(默认为无限制,编码为-1)。

  • where (字符串) — 在导入之前过滤掉源数据(使用SQL WHERE子句格式)。

  • order_by (对象列表) — 对源强制排序。

示例

在虚构数据集上进行转换对象的示例
{
  "enable_grouping": true,
  "aggregations": [
    {
      "expression": "SUM(unit_price*quantity)",
      "field_name": "total_amount_sold"
    },
    {
      "expression": "SUM(quantity)",
      "field_name": "total_quantity_sold"
    }
  ],
  "limit": 50,
  "where": "sourceId IS NOT NULL"
}

操作

actions部分包含可在导入过程的特定步骤之前或之后运行的命令。每个步骤称为一个stage。例如,您可以在步骤完成后提交HTTP请求、在源上执行SQL查询或在Neo4j目标实例上运行Cypher语句。

操作规范框架
  ...
  "actions": [
    { <actionSpec1> },
    { <actionSpec2> },
    ...
  ]

每个操作对象至少必须具有属性nametypestage。其他属性取决于操作类型。

{
  "type": "http",
  "name": "<actionName>",
  "stage": "<stageName>",
  "method": "<get/post>",
  "url": "<targetUrl>",
  "headers": {}
}
  • type (字符串) — 操作类型。

  • name (字符串) — 操作的易于理解的名称(在所有名称中唯一)。

  • stage (字符串) — 操作应在导入的哪个时间点运行。有效值为:startpost_sourcespre_nodespost_nodespre_relationshipspost_relationshipspre_queriespost_queriesend

  • method (字符串) — HTTP方法;getpost

  • url (字符串) — HTTP请求的目标URL。

  • headers (对象,可选) — 请求头。

导入完成后发送GET请求的操作示例
{
  "type": "http",
  "name": "Post load ping",
  "stage": "end",
  "method": "get",
  "url": "https://neo4j.ac.cn/success",
  "headers": {
    "secret": "314159",
    "moreSecret": "17320"
  }
}
{
  "type": "cypher",
  "name": "<actionName>",
  "stage": "<stageName>",
  "query": "<cypherQuery>",
  "execution_mode": "<transaction/autocommit>"
}
  • type (字符串) — 操作类型。

  • name (字符串) — 操作的易于理解的名称(在所有名称中唯一)。

  • stage (字符串) — 操作应在导入的哪个时间点运行。有效值为:startpost_sourcespre_nodespost_nodespre_relationshipspost_relationshipspre_queriespost_queriesend

  • query (字符串) — 要运行的Cypher查询。

  • execution_mode (字符串,可选) — 查询应在什么模式下执行。有效值为transactionautocommit(默认值:transaction)。

导入完成后创建importJob节点的操作示例
{
  "type": "cypher",
  "name": "Post load log",
  "stage": "end",
  "query": "MERGE (:importJob {date: datetime()})"
}
{
  "type": "bigquery",
  "name": "<actionName>",
  "stage": "<stageName>",
  "sql": "<sqlQuery>"
}
  • type (字符串) — 操作类型。

  • name (字符串) — 操作的易于理解的名称(在所有名称中唯一)。

  • stage (字符串) — 操作应在导入的哪个时间点运行。有效值为:startpost_sourcespre_nodespost_nodespre_relationshipspost_relationshipspre_queriespost_queriesend

  • sql (字符串) — 要运行的SQL查询。

导入完成后发送GET请求的操作示例
{
  "type": "bigquery",
  "name": "Post load log",
  "stage": "end",
  "sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}

变量

可以在Dataflow中提供键值对以替换$分隔的标记。在创建Dataflow作业时,您可以将参数作为JSON对象提供在“选项JSON”字段中。变量插值适用于

  • BigQuery源查询(SQL)

  • 文本源URL

  • 自定义Cypher目标查询

  • BigQuery操作SQL

  • Cypher操作查询

  • HTTP GET/POST URL和标头值。

变量必须以$符号为前缀(例如$limit),并且可以在作业规范文件以及readQueryinputFilePattern(源URI)命令行参数中使用。