创建作业规范文件
作业配置文件指示 Dataflow 如何运行导入(从哪里获取数据,如何将其映射到 Neo4j 等)。它由包含四个部分的 JSON 对象组成。
{
"config": { ... }, (1)
"sources": [ (2)
{ ... }
],
"targets": [ (3)
{ ... }
],
"actions": [ (4)
{ ... }
]
}
1 | config — 影响导入执行方式的全局标志(可选) |
2 | sources — 数据源定义(关系型) |
3 | targets — 数据目标定义(图:节点/关系/Cypher 查询) |
4 | actions — 一次性操作(可选) |
从高层次来看,作业从 sources
中获取数据,并将它们转换为/导入到 targets
中。
完整示例
下面是一个开箱即用的作业规范文件示例,用于导入公开可用的 movies
数据集。
该数据集包含实体 Person
和 Movie
,它们通过 DIRECTED
和 ACTED_IN
关系链接在一起。换句话说,每个 Person
可以 DIRECTED
和/或 ACTED_IN
一个 Movie
。实体和关系都附加了额外的详细信息。数据来自以下文件:persons.csv、movies.csv、acted_in.csv、directed.csv。

接下来的部分会将其分解,并为每个部分提供上下文信息。我们建议您在阅读作业规范示例的同时阅读本指南。
{
"version": "1",
"config": {
"reset_db": true
},
"sources": [
{
"type": "bigquery",
"name": "persons",
"query": "SELECT person_tmdbId, name, bornIn, born, died FROM team-connectors-dev.movies.persons"
},
{
"type": "bigquery",
"name": "movies",
"query": "SELECT movieId, title, imdbRating, year FROM team-connectors-dev.movies.movies"
},
{
"type": "bigquery",
"name": "directed",
"query": "SELECT movieId, person_tmdbId FROM team-connectors-dev.movies.directed"
},
{
"type": "bigquery",
"name": "acted_in",
"query": "SELECT movieId, person_tmdbId, role FROM team-connectors-dev.movies.acted_in"
}
],
"targets": {
"nodes": [
{
"source": "persons",
"name": "Persons",
"write_mode": "merge",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_property": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_property": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_property": "bornDate",
"target_property_type": "date"
},
{
"source_field": "died",
"target_property": "diedDate",
"target_property_type": "date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
},
{
"source": "movies",
"name": "Movies",
"write_mode": "merge",
"labels": [ "Movie" ],
"properties": [
{
"source_field": "movieId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "title",
"target_property": "title",
"target_property_type": "string"
},
{
"source_field": "year",
"target_property": "releaseYear",
"target_property_type": "string"
},
{
"source_field": "imdbRating",
"target_property": "imdbRating",
"target_property_type": "float"
}
],
"schema": {
"key_constraints": [
{
"name": "movieIdKey",
"label": "Movie",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "movieTitleUnique",
"label": "Movie",
"properties": ["title"]
}
]
}
}
],
"relationships": [
{
"source": "directed",
"name": "Directed",
"write_mode": "merge",
"node_match_mode": "match",
"type": "DIRECTED",
"start_node_reference": "Persons",
"end_node_reference": "Movies"
},
{
"source": "acted_in",
"name": "Acted_in",
"write_mode": "merge",
"node_match_mode": "match",
"type": "ACTED_IN",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_property": "role",
"target_property_type": "string"
}
]
}
]
}
}
配置
config
对象包含导入作业的全局配置。所有设置都有默认值,因此除非您希望更改它们,否则无需指定它们。
"config": {
"reset_db": false,
"index_all_properties": false,
"node_target_batch_size": 5000,
"relationship_target_batch_size": 1000,
"query_target_batch_size": 1000,
"node_target_parallelism": 10,
"relationship_target_parallelism": 1,
"query_target_parallelism": 1
}
-
reset_db
(bool) — 是否在导入前清除目标数据库。删除数据、索引和约束。 -
index_all_properties
(bool) — 是否为所有属性创建索引。请参阅Cypher® → 搜索性能索引。 -
node_target_batch_size
(int) — 每个事务要处理的节点数量。 -
relationship_target_batch_size
(int) — 每个事务要处理的关系数量。 -
query_target_batch_size
(int) — 每个自定义查询要处理的行数。 -
node_target_parallelism
(int) — 每个工作程序节点目标的最大并发事务数。 -
relationship_target_parallelism
(int) — 每个工作程序关系目标的最大并发事务数。应谨慎设置大于1
的值,因为它们可能导致死锁。 -
query_target_parallelism
(int) — 每个工作程序 Cypher 查询目标的最大并发事务数。应谨慎设置大于1
的值,因为它们可能导致死锁。
源
sources
部分包含数据源的定义,作为一个列表。作为一个粗略的指南,您可以将 一张表 <=> 一个源
理解为等价关系。导入程序将获取源公开的数据,并将其提供给目标,最终将其映射到 Neo4j 中。
{
"type": "bigquery",
"name": "<sourceName>",
"query": "<bigQuerySqlQuery>"
}
-
type
(string) —bigquery
。 -
name
(string) — 源的易于理解的标签(在所有名称中唯一;可能不包含空格)。您将使用它从规范文件的其他部分引用源。 -
query
(string) — 要从 BigQuery 中提取的数据集,作为 SQL 查询。请注意-
源表可以包含比查询中选择的列更多的列;
-
多个目标可以使用相同的源,可能针对不同的列子集进行过滤。
-
不支持类型为 BIGNUMERIC 、GEOGRAPHY 、JSON 、INTERVAL 和 STRUCT 的列。 |
目标
targets
部分包含导入后将产生的图实体的定义。
Neo4j 使用节点(例如 movies
、people
)表示对象,并使用关系(例如 ACTED_IN
、DIRECTED
)将它们连接起来。targets
部分中的每个对象都将生成一个相应的实体(节点或关系),该实体从源中提取数据。也可以运行自定义 Cypher 查询。
"targets": {
"nodes": [ ... ],
"relationships": [ ... ],
"queries": [ ... ]
}
默认情况下,**您无需考虑节点和关系之间的依赖关系**。关系目标始终在对应于其起始和结束节点的目标之后处理。但是,可以将其他目标添加为依赖项。
节点对象
节点实体必须在 targets
对象内部的 nodes
键控列表中分组。
"targets": {
"nodes": [
{ <nodeSpec1> },
{ <nodeSpec2> },
...
]
}
必填字段
每个节点对象至少必须具有属性 source
、name
、labels
和 properties
。
{
"source": "<sourceName>",
"name": "<targetName>",
"labels": ["<label1>", "<label2>", ...],
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
],
"write_mode": "merge"
}
-
source
(string) — 此目标应从中提取数据的源的名称。应与sources
对象中的名称之一匹配。 -
name
(string) — 目标的易于理解的名称(在所有名称中唯一)。 -
labels
(字符串列表) — 标签,用于标记节点。 -
properties
(对象列表) — 源列和节点属性之间的映射。target_property_type
的有效值为:boolean
、byte_array
(假设为 base64 编码)、date
、duration
、float
、integer
、local_date
、local_datetime
、local_time
、point
、string
、zoned_datetime
、zoned_time
。每个属性类型(除了 byte_array)也以其“_array”形式(即 date_array、string_array 等)提供,用于 BigQuery 的“重复”列类型。 -
write_mode
(string) — Neo4j 中的创建模式。可以是create
或merge
。有关 Cypher 子句行为的信息,请参阅CREATE
和MERGE
。
模式定义
如果全局配置 index_all_properties 设置为 true ,则所有属性都将使用范围索引进行索引。 |
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"labels": ["label1", "label2", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
其中每个对象的属性为
源数据对于 key_constraints 列不能有空值,否则它们将与节点键约束冲突。如果源在这方面不干净,请考虑在相关的 source.query 字段中提前清理它,方法是排除所有不满足约束的行(例如 WHERE person_tmbdId IS NOT NULL )。或者,在源转换中使用 where 属性。 |
选项 key_constraints 和 unique_constraints 需要 Neo4j/Aura 企业版,在针对 Neo4j 社区版安装运行时没有任何效果。 |
配置
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(bool) — 是否应将目标包含在导入中(默认值:true
)。 -
source_transformations
(object) — 如果enable_grouping
设置为true
,则导入将在key_constraints
和properties
中指定的所有字段上附加 SQL 子句GROUP BY
。如果设置为false
,则源中的任何重复数据都将推送到 Neo4j 中,这可能会引发约束错误或降低插入效率。该对象还可以包含聚合函数和其他字段,请参阅源转换。 -
depends_on
(字符串列表) — 应在当前目标之前执行的目标的name
。
示例
Person
节点的节点对象示例{
"source": "persons",
"name": "Persons",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_field": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_field": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_field": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_field": "bornDate",
"target_property_type": "local_date"
},
{
"source_field": "died",
"target_field": "diedDate",
"target_property_type": "local_date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
}
关系对象
关系实体必须分组到一个以relationships
为键的列表中,该列表位于targets
对象内。
"targets": {
...
"relationships": [
{ <relationshipSpec1> },
{ <relationshipSpec2> },
...
]
}
必填字段
每个关系对象至少必须具有属性source
、name
和type
。
它还必须包含有关哪个节点目标将关系链接在一起的信息。您可以通过start_node_reference
和end_node_reference
提供此信息。
{
"source": "<sourceName>",
"name": "<targetName>",
"type": "<relationshipType>",
"start_node_reference": "<nodeTargetName>",
"end_node_reference": "<nodeTargetName>",
"node_match_mode": "<match/merge>",
"write_mode": "<create/merge>"
}
-
source
(string) — 此目标应从中提取数据的源的名称。应与sources
对象中的名称之一匹配。 -
name
(string) — 目标的易于理解的名称(在所有名称中唯一)。 -
type
(字符串) — 类型,分配给关系。 -
start_node_reference
(字符串) — 充当关系开始的节点目标的名称。 -
end_node_reference
(字符串) — 充当关系结束的节点目标的名称。 -
node_match_mode
(字符串) — 在创建它们之间的关系之前,用于获取源/端节点的 Cypher 子句。有效值为match
或merge
,分别导致 Cypher 子句MATCH
和MERGE
。 -
write_mode
(string) — Neo4j 中的创建模式。可以是create
或merge
。有关 Cypher 子句行为的信息,请参阅CREATE
和MERGE
。
|
属性
关系还可以将源列映射为属性。
{
...
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
]
}
-
properties
(对象列表) — 源列和关系属性之间的映射。target_property_type
的有效值为:boolean
、byte_array
(假设为 base64 编码)、date
、duration
、float
、integer
、local_date
、local_datetime
、local_time
、point
、string
、zoned_datetime
、zoned_time
。每个属性类型(除了 byte_array)也以其“_array”形式(即 date_array、string_array 等)提供,用于 BigQuery 的“重复”列类型。
模式定义
如果全局配置 index_all_properties 设置为 true ,则所有属性都将使用范围索引进行索引。 |
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"types": ["<relationshipType1>", "<relationshipType2>", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
其中每个对象的属性为
源数据对于key_constraints 列不得有空值,否则它们将与关系键约束冲突。如果源在这方面不干净,请考虑在相关的source.query 字段中预先清理它,方法是排除所有不满足约束的行(例如WHERE person_tmbdId IS NOT NULL )。或者,在源转换中使用where 属性。 |
选项 key_constraints 和 unique_constraints 需要 Neo4j/Aura 企业版,在针对 Neo4j 社区版安装运行时没有任何效果。 |
配置
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(布尔值) — 目标是否应包含在导入中。 -
source_transformations
(对象) — 如果enable_grouping
设置为true
,则导入将对key_constraints
和properties
中指定的所有字段进行 SQLGROUP BY
。如果设置为false
,则源中的任何重复数据都将推送到 Neo4j,这可能会导致约束错误或使插入效率降低。该对象还可以包含聚合函数和其他字段,请参见源转换。 -
depends_on
(字符串列表) — 应在当前目标之前执行的目标的name
。
示例
ACTED_IN
关系的关系对象示例{
"source": "acted_in",
"name": "Acted_in",
"type": "ACTED_IN",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_field": "role",
"target_property_type": "string"
}
]
}
自定义查询目标
当导入需要一个不适合节点/关系目标格式的复杂查询时,自定义查询目标非常有用。查询目标通过变量$rows
接收批处理的行。
自定义查询必须分组到一个以queries
为键的列表中,该列表位于targets
对象内。
"targets": {
...
"queries": [
{ <querySpec1> },
{ <querySpec2> },
...
]
}
不要使用自定义查询来运行不直接依赖于源的 Cypher;而是使用操作。一次性查询,尤其是如果非幂等的,不适合在自定义查询目标中使用。这样做的原因是,来自目标的查询是分批运行的,因此自定义查询可能会根据从源提取的$rows 批次的数目运行多次。 |
必填字段
每个查询目标至少必须具有属性source
、name
和query
。
{
"source": "<sourceName>",
"name": "<targetName>",
"query": "<cypherQuery>"
}
-
source
(string) — 此目标应从中提取数据的源的名称。应与sources
对象中的名称之一匹配。 -
name
(string) — 目标的易于理解的名称(在所有名称中唯一)。 -
query
(字符串) — 一个 Cypher 查询。来自源的数据在参数$rows
中可用,作为一个列表。
配置
{
...
"active": true,
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(布尔值) — 目标是否应包含在导入中。 -
depends_on
(字符串列表) — 应在当前目标之前执行的目标的name
。
示例
Person
节点并在创建时设置日期的查询对象示例{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
源转换
每个节点和关系目标可以选择具有一个source_transformation
属性,该属性包含聚合函数。这对于从更细粒度的源中提取更高级别的维度很有用。聚合会产生额外的字段,这些字段可用于属性映射。
"source_transformations": {
"enable_grouping": true,
"aggregations": [ {
"expression": "",
"field_name": ""
},
{ aggregationObj2 }, ...
],
"limit": -1,
"where": "",
"order_by": [
{
"expression": "column_name",
"order": "<asc/desc>"
},
{ orderObj2 }, ...
],
}
-
enable_grouping
(布尔值) — 必须为true
才能使aggregations
/where
工作。 -
aggregations
(对象列表) — 聚合在expression
属性中指定为 SQL 查询,结果在field_name
中指定的名称下作为源列可用。 -
limit
(整数) — 限制被视为要导入的源行的数量(默认为无限制,编码为-1
)。 -
where
(字符串) — 在导入之前过滤掉源数据(使用 SQLWHERE
子句格式)。 -
order_by
(对象列表) — 对源强制排序。
操作
actions
部分包含可以在导入过程的特定步骤之前或之后运行的命令。每个步骤称为一个stage
。例如,您可以在步骤完成后提交 HTTP 请求,在源上执行 SQL 查询,或在 Neo4j 目标实例上运行 Cypher 语句。
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
每个操作对象至少必须具有属性name
、type
和stage
。其他属性取决于操作类型。
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type
(字符串) — 操作类型。 -
name
(字符串) — 操作的易于理解的名称(在所有名称中唯一)。 -
stage
(字符串) — 操作应在导入的哪个时间点运行。有效值为:start
、post_sources
、pre_nodes
、post_nodes
、pre_relationships
、post_relationships
、pre_queries
、post_queries
、end
。 -
method
(字符串) — HTTP 方法;get
或post
。 -
url
(字符串) — HTTP 请求应针对的 URL。 -
headers
(对象,可选) — 请求头。
GET
请求的操作示例{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "https://neo4j.ac.cn/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type
(字符串) — 操作类型。 -
name
(字符串) — 操作的易于理解的名称(在所有名称中唯一)。 -
stage
(字符串) — 操作应在导入的哪个时间点运行。有效值为:start
、post_sources
、pre_nodes
、post_nodes
、pre_relationships
、post_relationships
、pre_queries
、post_queries
、end
。 -
query
(字符串) — 要运行的 Cypher 查询。 -
execution_mode
(字符串,可选) — 应在什么模式下执行查询。有效值为transaction
、autocommit
(默认:transaction
)。
importJob
节点的操作示例{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type
(字符串) — 操作类型。 -
name
(字符串) — 操作的易于理解的名称(在所有名称中唯一)。 -
stage
(字符串) — 操作应在导入的哪个时间点运行。有效值为:start
、post_sources
、pre_nodes
、post_nodes
、pre_relationships
、post_relationships
、pre_queries
、post_queries
、end
。 -
sql
(字符串) — 要运行的 SQL 查询。
GET
请求的操作示例{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}
变量
可以在 Dataflow 中提供键值对以替换$
分隔的标记。您可以在创建 Dataflow 作业时在“选项 JSON”字段中提供参数,作为 JSON 对象。变量插值在以下位置有效:
-
BigQuery 源查询 (SQL)
-
文本源 URL
-
自定义 Cypher 目标查询
-
BigQuery 操作 SQL
-
Cypher 操作查询
-
HTTP GET/POST URL 和标头值。
变量必须以$
符号为前缀(例如$limit
),并且可以在作业规范文件以及readQuery
或inputFilePattern
(源 URI)命令行参数中使用。