将 CSV 数据导入 Neo4j
本文介绍了将 CSV 数据导入 Neo4j 的不同方法,并提供了解决过程中可能出现的潜在问题的方案。
CSV 是一个逗号分隔值文件,通常在 Excel 或其他电子表格工具中查看。分隔符可以是其他类型的字符,但最标准的是逗号。如今,许多系统和流程已经将其数据转换为 CSV 格式,以便将其输出到其他系统、人类友好的报告和其他需求中。它是一种标准文件格式,人类和系统已经熟悉使用和处理它。
导入 CSV 文件的方法
有几种不同的方法可以将 CSV 数据导入 Neo4j,每种方法都有不同的标准和功能。您选择的方法取决于数据集的大小,以及您对各种工具的熟悉程度。
让我们看看 Neo4j 读取和导入 CSV 文件的一些方法。
-
LOAD CSV
Cypher® 命令:此命令是一个很好的起点,可以处理小型到中型数据集(最多 1000 万条记录)。适用于任何设置,包括 AuraDB。 -
neo4j-admin database import
命令:命令行工具,适用于直接加载大型数据集。适用于 Neo4j 桌面版、Neo4j EE Docker 镜像和本地安装。 -
Neo4j ETL 工具:Neo4j 实验室项目。有关更多详细信息和文档,请访问 Neo4j ETL 工具页面。
-
Kettle 导入工具:映射和执行数据流程步骤,非常适合处理非常大的数据集,尤其是在您已经熟悉使用此工具的情况下。适用于任何设置,包括 AuraDB。
在下面一节中,您可以找到关于 LOAD CSV
Cypher 命令和 neo4j-admin database import
命令的简要概述,它们是如何工作的,以及如何开始使用一般用例。数据质量也可能成为任何系统任何类型数据导入的问题,因此本节将介绍一些这些潜在的困难以及克服它们的方法。
LOAD CSV 命令与 Cypher
LOAD CSV
子句是 Cypher 查询语言的一部分。有关 LOAD CSV
子句的更多信息,请参见 Cypher 手册 → LOAD CSV。它具有广泛的适用性。LOAD CSV
不仅仅是一个基本的 数据摄取机制。它在一个操作中执行多个操作
-
支持从 URI 加载/摄取 CSV 数据。
-
将输入数据直接映射到复杂的图/域结构中。
-
处理数据转换。
-
支持复杂的计算。
-
创建或合并实体、关系和结构。
为了获得更好的控制,您可以在 Cypher Shell 中运行 |
读取 CSV 文件
LOAD CSV
可以处理本地和远程文件,并且每种文件都有相关的语法。这很容易被忽视,最终会导致访问错误,因此这里将阐明这些规则。
出于安全原因,默认情况下,本地文件只能从 Neo4j 导入目录读取,该目录根据您的操作系统而不同。每个操作系统的文件位置列在我们 的 Neo4j 操作手册 → 文件位置 中。建议将文件放在 Neo4j 的 _import_ 目录中,因为它可以保持环境安全。但是,如果您需要访问其他位置的文件,可以在我们的 Cypher 手册 → LOAD CSV 简介 中找到要修改的设置。
//Example 1 - file directly placed in import directory (import/data.csv)
LOAD CSV FROM "file:///data.csv"
//Example 2 - file placed in subdirectory within import directory (import/northwind/customers.csv)
LOAD CSV FROM "file:///northwind/customers.csv"
Web 托管文件可以直接使用其 URL 进行引用,例如 https://host/path/data.csv
。但是,必须设置权限,以便外部源可以读取该文件。要从本地文件系统读取文件,您需要检查配置设置 dbms.security.allow_csv_import_from_file_urls
是否设置为 true
。有关访问与在线文件导入相关的更多信息,请参见此 知识库文章。但请记住,在 Neo4j v5 中,配置设置已重命名,dbms.directories.import
已更改为 server.directories.import
。
//Example 1 - website
LOAD CSV FROM 'https://data.neo4j.com/northwind/customers.csv'
//Example 2 - Google
LOAD CSV WITH HEADERS FROM 'https://docs.google.com/spreadsheets/d/<yourFilePath>/export?format=csv'
LOAD CSV 的重要提示
使用 LOAD CSV
时,需要注意一些事项,以及一些处理您可能遇到的各种数据场景的有用提示。
-
CSV 文件中的所有数据都以字符串形式读取,因此您需要使用
toInteger()
、toFloat()
、split()
或类似函数来转换值。 -
检查您的 Cypher 导入语句中是否有错别字。标签、属性名称、关系类型和变量区分大小写。
-
数据越干净,加载越容易。尝试在加载之前处理复杂的清理/操作。
使用 LOAD CSV 转换数据值
Cypher 有一些清理和转换功能,可以帮助您清理数据。这些功能对于处理缺失数据或将字段拆分为图中的多个值非常有用。
首先,请记住 Neo4j 不存储空值。CSV 文件中的空值或空字段可以在 LOAD CSV
中跳过或替换为默认值。
假设您有以下 CSV 文件
Id,Name,Location,Email,BusinessType
1,Neo4j,San Mateo,contact@neo4j.com,P
2,AAA,,info@aaa.com,
3,BBB,Chicago,,G
导入 CSV 文件的默认位置是您的 Neo4j 实例的 import 目录。 |
以下是导入此数据的一些示例。
//skip null values
LOAD CSV WITH HEADERS FROM 'file:///companies.csv' AS row
WITH row WHERE row.Id IS NOT NULL
MERGE (c:Company {companyId: row.Id});
// clear data
MATCH (n:Company) DELETE n;
//set default for null values
LOAD CSV WITH HEADERS FROM 'file:///companies.csv' AS row
MERGE (c:Company {companyId: row.Id, hqLocation: coalesce(row.Location, "Unknown")})
// clear data
MATCH (n:Company) DELETE n;
//change empty strings to null values (not stored)
LOAD CSV WITH HEADERS FROM 'file:///companies.csv' AS row
MERGE (c:Company {companyId: row.Id})
SET c.emailAddress = CASE trim(row.Email) WHEN "" THEN null ELSE row.Email END
接下来,如果您在 CSV 中有一个包含要拆分的项目列表的字段,可以使用 Cypher 的 split()
函数来分隔单元格中的数组。
假设您有以下 CSV 文件
Id,Name,Skills,Email
1,Joe Smith,Cypher:Java:JavaScript,joe@neo4j.com
2,Mary Jones,Java,mary@neo4j.com
3,Trevor Scott,Java:JavaScript,trevor@neo4j.com
LOAD CSV WITH HEADERS FROM 'file:///employees.csv' AS row
MERGE (e:Employee {employeeId: row.Id, email: row.Email})
WITH e, row
UNWIND split(row.Skills, ':') AS skill
MERGE (s:Skill {name: skill})
MERGE (e)-[r:HAS_EXPERIENCE]->(s)
可以使用 CASE
实现条件转换。您已经看到一个例子,我们在检查空值或空字符串时使用了它,但让我们再看一个例子。
// clear data
MATCH (n:Company) DELETE n;
//set businessType property based on shortened value in CSV
LOAD CSV WITH HEADERS FROM 'file:///companies.csv' AS row
WITH row WHERE row.Id IS NOT NULL
WITH row,
(CASE row.BusinessType
WHEN 'P' THEN 'Public'
WHEN 'R' THEN 'Private'
WHEN 'G' THEN 'Government'
ELSE 'Other' END) AS type
MERGE (c:Company {companyId: row.Id, hqLocation: coalesce(row.Location, "Unknown")})
SET c.emailAddress = CASE trim(row.Email) WHEN "" THEN null ELSE row.Email END
SET c.businessType = type
RETURN *
优化 LOAD CSV 的性能
通常,有一些方法可以提高数据加载期间的性能,这在处理大量数据或复杂的加载时特别有用。
为了提高将唯一实体插入图或更新图的性能(使用 MERGE
或 MATCH
进行更新),您可以为要合并或匹配的每个标签和属性创建索引和约束。
为了获得最佳性能,请始终在具有索引主密钥属性的单个标签上 |
假设您使用前面提到的 companies.csv 文件,现在您有一个包含人员及其工作公司信息的文件
employeeId,Name,Company
1,Bob Smith,1
2,Joe Jones,3
3,Susan Scott,2
4,Karen White,1
您还应该将节点和关系创建分成单独的处理。例如,而不是使用以下方法
MERGE (e:Employee {employeeId: row.employeeId})
MERGE (c:Company {companyId: row.companyId})
MERGE (e)-[r:WORKS_FOR]->(c)
您可以这样编写
// clear data
MATCH (n)
DETACH DELETE n;
// load Employee nodes
LOAD CSV WITH HEADERS FROM 'file:///people.csv' AS row
MERGE (e:Employee {employeeId: row.employeeId, name: row.Name})
RETURN count(e);
// load Company nodes
LOAD CSV WITH HEADERS FROM 'file:///companies.csv' AS row
WITH row WHERE row.Id IS NOT NULL
WITH row,
(CASE row.BusinessType
WHEN 'P' THEN 'Public'
WHEN 'R' THEN 'Private'
WHEN 'G' THEN 'Government'
ELSE 'Other' END) AS type
MERGE (c:Company {companyId: row.Id, hqLocation: coalesce(row.Location, "Unknown")})
SET c.emailAddress = CASE trim(row.Email) WHEN "" THEN null ELSE row.Email END
SET c.businessType = type
RETURN count(c);
// create relationships
LOAD CSV WITH HEADERS FROM 'file:///people.csv' AS row
MATCH (e:Employee {employeeId: row.employeeId})
MATCH (c:Company {companyId: row.Company})
MERGE (e)-[:WORKS_FOR]->(c)
RETURN *;
这样,加载操作就只在一次执行一个部分的导入操作,并且可以快速高效地处理大量数据,从而减少繁重的处理。
当加载的数据量太大而无法放入内存时,可以使用几种不同的方法来解决数据加载期间内存不足的问题。
-
将导入分成几部分,使用
CALL { … } IN TRANSACTIONS
。此子查询可以在
LOAD CSV
子句之后添加,告诉 Cypher 在清除内存和事务状态之前只处理文件的这么多行。有关更多信息,请参见 Cypher 手册 → 子查询。示例LOAD CSV FROM 'file:///people.csv' AS line CALL { WITH line MATCH (e:Employee {id: line[0]}) CREATE (e)-[:REL {prop: line[1]}]->(e) } IN TRANSACTIONS OF 100000 ROWS;
-
避免使用
Eager
运算符。一些语句会拉取比必要更多的行,从而增加了预先处理的负担。为了避免这种情况,您可以对查询运行
PROFILE
以查看它们是否使用Eager
加载,并修改查询或对同一文件运行多次传递,以防止这种情况发生。有关Eager
运算符的更多信息,请参见 Cypher 手册 → 执行计划运算符详解。 -
调整数据库的堆和内存配置,以避免页面错误。
为了帮助处理大量事务,您可以增加数据库的一些配置设置,并重新启动实例以使它们生效。通常,您可以在每个 2 GB 堆中在一个事务中创建或更新 100 万条记录。在
neo4j.conf
中-
server.memory.heap.initial_size
和server.memory.heap.max_size
:至少设置为 4G。 -
server.memory.pagecache.size
:理想情况下,值应足够大,以将整个数据库保留在内存中。
-
neo4j-admin database import
命令
LOAD CSV
非常适合导入小型或中型数据集(最多 1000 万条记录)。对于比这更大的数据集,您可以使用 neo4j-admin database import
命令。这使您可以通过指定节点文件和关系文件将 CSV 数据导入到未使用的数据库中。
neo4j-admin database import
命令只能用于初始图形填充。
假设您想通过 neo4j-admin database import
将订单数据导入到 Neo4j 实例中。请注意,以下某些 CSV 文件包含标题,而有些则包含单独的标题文件。如果要执行导入,请将它们放在 Neo4j 实例的 import 文件夹中。
customerId:ID(Customer), name
23, Delicatessen Inc
42, Delicious Bakery
productId:ID(Product), name, price, :LABEL
11,Chocolate,10,Product;Food
orderId:ID(Order),date,total,customerId:IGNORE
:END_ID(Order),date:IGNORE,total:IGNORE,:START_ID(Customer)
1041,2020-05-10,130,23
1042,2020-05-12,20,42
:START_ID(Order),amount,price,:END_ID(Product)
1041,13,130,11
1042,2,20,11
neo4j-admin database import
命令有两种模式
-
full — 用于最初将数据导入到不存在的空数据库中。
-
incremental — 用于将数据增量导入到现有数据库中。
该工具位于 <neo4j-instance-location>/bin/neo4j-admin
中,您可以在终端窗口中运行该命令,该窗口已导航到 Neo4j 实例的 import 目录。
以下是如何在 Neo4j 5.x 中导入上述 CSV 文件的示例。您必须指定数据库的名称。在本例中,我们指定了 orders。
bin/neo4j-admin database import full
--nodes=Customer=import/customers.csv
--nodes=import/products.csv
--nodes=Order=import/orders_header.csv, import/orders1.csv, import/orders2.csv
--relationships=CONTAINS=import/order_details.csv
--relationships=ORDERED=import/customer_orders_header.csv, import/orders1.csv, import/orders2.csv
--trim-strings=true orders
您必须在 单行 上指定此脚本的参数。此处显示换行符以提高可读性。 |
运行此命令时,它会导入数据并使其可供数据库使用。neo4j-admin database import
命令不会创建新的数据库。
重复的 --nodes
和 --relationships
参数是多个(可能被拆分的)相同实体的 CSV 文件组,即具有相同列结构的文件组。
每组的所有文件都被视为可以连接为一个大型文件。组中第一个文件或单独的单行文件中需要 标题行。将标题放在单独的文件中可以比将其放在多千兆字节的文本文件中更容易处理和编辑。压缩文件也受支持。
-
--id-type=string
表示所有:ID
列都包含字母数字值(对仅包含数字的 ID 进行了优化)。 -
customers.csv
直接作为具有:Customer
标签的节点导入,属性直接取自文件。 -
Product
节点遵循相同的模式,节点标签取自:LABEL
列。 -
Order
节点取自三个文件——一个标题文件和两个内容文件。 -
order_details.csv
中创建的类型为:CONTAINS
的行项关系,通过其 ID 将订单与其包含的产品相关联。 -
订单使用订单 CSV 文件再次连接到客户,但这次使用不同的标题,该标题 :IGNORE’s 不相关的列。
列名用于节点和关系的属性名。某些列上有特定的标记
-
name:ID
- 用于查找节点以供将来重新连接的全局 ID 列。-
如果属性名被省略,则不会存储(临时),这就是
--id-type
所指的。 -
如果您的实体中重复了 ID,则必须提供实体(ID 组),例如
:ID(Order)
。 -
如果您的 ID 在全局范围内是唯一的,则可以省略它。
-
-
:LABEL
- 节点的标签列。多个标签可以用分隔符分隔。 -
:START_ID
,:END_ID
- 引用节点 ID 的关系文件列。对于 ID 组,使用:END_ID(Order)
。 -
:TYPE
- 用于指定关系类型的列。 -
所有其他列都将被视为属性,但如果为空或用
:IGNORE
进行注释,则会跳过。 -
可以通过在名称后面添加指示符(如
:INT
,:BOOLEAN
等)来进行类型转换。
有关此标题格式和工具的更多详细信息,请参见 Neo4j 操作手册 → Neo4j 管理员导入 和附带的 教程 中的相关部分。
如果您使用的是 Neo4j 4.4,请访问 操作手册 → 教程:Neo4j 管理员导入 获取说明。 |
CSV 数据质量
现实世界中的数据很混乱。在处理数据时,您会看到一些需要清理或转换的值,然后再将其移动到其他系统。小的语法错误、格式描述、一致性、正确的引用,甚至对数据要求或标准的不同假设,都可能很容易导致您在以后花费数小时进行清理。
我们将重点介绍将数据从其他系统加载到 Neo4j 时容易被忽略的一些数据质量问题,并尝试帮助避免数据导入和清理问题。
常见陷阱
-
标题与数据不一致(缺少、太多列、标题中使用不同的分隔符).
验证标题与文件中的数据是否匹配。在此阶段调整格式、分隔符、列等将节省大量时间。
-
文件中存在多余或缺少的引号.
非引号文本中间的独立双引号或单引号,或引号文本中未转义的引号,可能会导致文件读取加载出现问题。最好转义或删除多余的引号。有关正确转义的文档,请参见 Cypher 样式指南。
-
文件中的特殊字符或换行符.
处理文件中的任何特殊字符时,请确保它们被引用或删除它们。对于引号或未引号字段中的换行符,请为它们添加引号或删除它们。
-
换行符不一致.
计算机难以处理的不一致数据之一就是换行符。确保整个文件的换行符一致。我们建议选择 Unix 样式以与 Linux 系统兼容(导入工具的常见格式)。
-
二进制零、BOM 字节顺序标记(文件开头的 2 个 UTF-8 字节)或其他非文本字符.
任何不寻常的字符或工具特定的格式有时会隐藏在应用程序工具中,但在基本的编辑器中很容易显现。如果在文件中遇到此类字符,最好将它们全部删除。
工具
如上所述,某些应用程序具有特殊的格式,使文档看起来很好,但这种隐藏的额外代码不被常规文件阅读器和脚本处理。有时,很难找到小的语法更改,或者对数据量很大的文件进行广泛的调整。
为了处理这些情况或进行一般数据清理,有很多工具可以帮助您检查和验证 CSV 数据文件。
基本的工具,如 hexdump、vi、emacs、UltraEdit 和 Notepad++ 非常适合处理用于编辑和操作文件的快捷键命令。但是,还有其他更高效或更人性化的选项可用,它们可以帮助进行数据清理和格式化。
-
Cypher - Cypher 所看到的就是要导入的内容,因此您可以利用这一点。使用
LOAD CSV
而不创建图形结构,只需输出样本、计数或分布,以便能够检测错误的标题列计数、分隔符、引号、转义字符或标题名称拼写。 -
CSVKit - 一组 Python 工具,为您的 CSV 文件提供统计数据 (csvstat)、搜索 (csvgrep) 等功能。
-
CSVLint - 一项在线服务,用于验证 CSV 文件。您可以上传文件或提供 URL 加载文件。
-
Papa Parse - 一个全面的 Javascript 库,用于 CSV 解析,允许您流式传输 CSV 数据,并在出现问题时提供良好的、可读的人类错误报告。
// assert correct line count
LOAD CSV FROM "file-url" AS line
RETURN count(*);
// check first 5 line-sample with header-mapping
LOAD CSV WITH HEADERS FROM "file-url" AS line
RETURN line
LIMIT 5;