加载 CSV 文件

Colab Google Colab 中跟随笔记本

可以使用 LOAD CSV Cypher 子句将 CSV 文件加载到 AuraDS 实例中。出于安全原因,无法加载本地 CSV 文件,这些文件必须改为在 HTTP 或 HTTPS 服务器(例如 GitHub、Google Drive 和 Dropbox)上公开访问。另一种使 CSV 文件可用的方法是将它们上传到云存储桶存储(例如 Google Cloud Storage 或 Amazon S3)并将存储桶配置为静态网站。

在此示例中,我们将加载三个 CSV 文件

  • movies.csv:包含电影列表,包括标题、发行年份和简短描述

  • people.csv:包含演员列表,包括出生年份

  • actors.csv:包含演员角色列表,其中演员与他们出演的电影匹配

LOAD CSV 命令用于处理中小型数据集,例如最多 1000 万个节点和关系。对于超过此限制的任何数据集,应避免使用此命令。

设置

有关如何开始使用 Python 的更多信息,请参阅 使用 Python 连接 教程。

pip install graphdatascience
# Import the client
from graphdatascience import GraphDataScience

# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""

# Configure the client with AuraDS-recommended settings
gds = GraphDataScience(
    AURA_CONNECTION_URI,
    auth=(AURA_USERNAME, AURA_PASSWORD),
    aura_ds=True
)

在以下代码示例中,我们使用 print 函数打印 Pandas DataFrameSeries 对象。您可以尝试使用不同的方法打印 Pandas 对象,例如通过 to_stringto_json 方法;如果您使用 JSON 表示形式,在某些情况下,您可能需要包括一个 默认处理程序 来处理 Neo4j DateTime 对象。查看 Python 连接 部分以获取一些示例。

有关如何开始使用 Cypher Shell 的更多信息,请参阅 Neo4j Cypher Shell 教程。

从 Cypher Shell 安装目录运行以下命令。
export AURA_CONNECTION_URI="neo4j+s://xxxxxxxx.databases.neo4j.io"
export AURA_USERNAME="neo4j"
export AURA_PASSWORD=""

./cypher-shell -a $AURA_CONNECTION_URI -u $AURA_USERNAME -p $AURA_PASSWORD

有关如何开始使用 Python 的更多信息,请参阅 使用 Python 连接 教程。

pip install neo4j
# Import the driver
from neo4j import GraphDatabase

# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""

# Instantiate the driver
driver = GraphDatabase.driver(
    AURA_CONNECTION_URI,
    auth=(AURA_USERNAME, AURA_PASSWORD)
)
# Import to prettify results
import json

# Import for the JSON helper function
from neo4j.time import DateTime

# Helper function for serializing Neo4j DateTime in JSON dumps
def default(o):
    if isinstance(o, (DateTime)):
        return o.isoformat()

创建约束

在加载任何数据之前添加约束通常可以提高数据加载性能。事实上,除了添加完整性检查之外,唯一约束还会同时在属性上添加索引,以便在加载期间 MATCHMERGE 操作更快。

为了在使用 MERGEMATCHLOAD CSV 时获得最佳性能,请确保在用于合并的属性上创建了索引或唯一约束。阅读 Cypher 文档 以获取有关约束的更多信息。

在此示例中,我们在电影标题和演员姓名上都添加了唯一约束。

# Make movie titles unique
gds.run_cypher("""
    CREATE CONSTRAINT FOR (movie:Movie) REQUIRE movie.title IS UNIQUE
""")

# Make person names unique
gds.run_cypher("""
    CREATE CONSTRAINT FOR (person:Person) REQUIRE person.name IS UNIQUE
""")
CREATE CONSTRAINT FOR (movie:Movie) REQUIRE movie.title IS UNIQUE;
CREATE CONSTRAINT FOR (person:Person) REQUIRE person.name IS UNIQUE;
movie_title_constraint = """
    CREATE CONSTRAINT FOR (movie:Movie) REQUIRE movie.title IS UNIQUE
"""

person_name_constraint = """
    CREATE CONSTRAINT FOR (person:Person) REQUIRE person.name IS UNIQUE
"""

# Create the driver session
with driver.session() as session:
    # Make movie titles unique
    session.run(movie_title_constraint).data()
    # Make person names unique
    session.run(person_name_constraint).data()

从 CSV 文件添加节点

我们现在已准备好从其 URI 加载 CSV 文件并从其包含的数据创建节点。在以下示例中,LOAD CSVWITH HEADERS 一起使用,以便按其相应的列名称访问 row 字段。此外

  • MERGE 与索引属性一起使用,以利用在 创建约束 部分创建的约束。

  • ON CREATE SET 用于在创建新节点时设置节点属性的值。

  • RETURN count(*) 用于显示处理的行数。

请注意,此示例中的 CSV 文件已整理,因此出于简单起见,做了一些假设。例如,在现实世界中,CSV 文件可能包含多行,这些行会尝试为同一个节点分配不同的属性值;在这种情况下,必须添加 ON MATCH SET 子句以确保正确处理这种情况。

gds.run_cypher("""
    LOAD CSV
      WITH HEADERS
      FROM 'https://data.neo4j.com/intro/movies/movies.csv' AS row
    MERGE (m:Movie {title: row.title})
      ON CREATE SET m.released = toInteger(row.released), m.tagline = row.tagline
    RETURN count(*)
""")

gds.run_cypher("""
    LOAD CSV
      WITH HEADERS
      FROM 'https://data.neo4j.com/intro/movies/people.csv' AS row
    MERGE (p:Person {name: row.name})
      ON CREATE SET p.born = toInteger(row.born)
    RETURN count(*)
""")
LOAD CSV
  WITH HEADERS
  FROM 'https://data.neo4j.com/intro/movies/movies.csv' AS row
MERGE (m:Movie {title: row.title})
  ON CREATE SET m.released = toInteger(row.released), m.tagline = row.tagline
RETURN count(*);

LOAD CSV
  WITH HEADERS
  FROM 'https://data.neo4j.com/intro/movies/people.csv' AS row
MERGE (p:Person {name: row.name})
  ON CREATE SET p.born = toInteger(row.born)
RETURN count(*);
load_movies_csv = """
    LOAD CSV
      WITH HEADERS
      FROM 'https://data.neo4j.com/intro/movies/movies.csv' AS row
    MERGE (m:Movie {title: row.title})
      ON CREATE SET m.released = toInteger(row.released), m.tagline = row.tagline
    RETURN count(*)
"""

load_people_csv = """
    LOAD CSV
      WITH HEADERS
      FROM 'https://data.neo4j.com/intro/movies/people.csv' AS row
    MERGE (p:Person {name: row.name})
      ON CREATE SET p.born = toInteger(row.born)
    RETURN count(*)
"""

# Create the driver session
with driver.session() as session:
    # Load the CSV files
    session.run(load_movies_csv).data()
    session.run(load_people_csv).data()

从 CSV 文件添加关系

与我们对节点所做的类似,我们现在从 actors.csv 文件创建关系。在以下示例中,LOAD CSVWITH HEADERS 选项一起使用,以便按其相应的列名称访问每行中的字段。

LOAD CSV 的默认字段分隔符是逗号 (,)。使用 FIELDTERMINATOR 选项 设置不同的分隔符。

如果 CSV 文件很大,请使用 CALL IN TRANSACTIONS 子句 提交每个事务中的多行,而不是整个文件。

此外

  • MATCHMERGE 用于查找节点(利用在 创建约束 部分创建的约束)并在它们之间创建关系。

  • ON CREATE SET 用于在创建新关系时设置关系属性的值。

  • RETURN count(*) 用于显示处理的行数。

gds.run_cypher("""
    LOAD CSV
      WITH HEADERS
      FROM 'https://data.neo4j.com/intro/movies/actors.csv' AS row
    MATCH (p:Person {name: row.person})
    MATCH (m:Movie {title: row.movie})
    MERGE (p)-[actedIn:ACTED_IN]->(m)
      ON CREATE SET actedIn.roles = split(row.roles, ';')
    RETURN count(*)
""")
LOAD CSV
  WITH HEADERS
  FROM 'https://data.neo4j.com/intro/movies/actors.csv' AS row
MATCH (p:Person {name: row.person})
MATCH (m:Movie {title: row.movie})
MERGE (p)-[actedIn:ACTED_IN]->(m)
  ON CREATE SET actedIn.roles = split(row.roles, ';')
RETURN count(*)
load_actors_csv = """
    LOAD CSV
      WITH HEADERS
      FROM 'https://data.neo4j.com/intro/movies/actors.csv' AS row
    MATCH (p:Person {name: row.person})
    MATCH (m:Movie {title: row.movie})
    MERGE (p)-[actedIn:ACTED_IN]->(m)
      ON CREATE SET actedIn.roles = split(row.roles, ';')
    RETURN count(*)
"""

# Create the driver session
with driver.session() as session:
    # Load the CSV file
    session.run(load_actors_csv).data()

运行 Cypher 查询

创建完所有节点和关系后,我们可以运行查询以检查数据是否已正确插入。以下查询查找包含 Keanu Reeves 的电影,按发布日期排序并对它们的标题进行分组。

gds.run_cypher("""
    MATCH (person:Person {name: "Keanu Reeves"})-[:ACTED_IN]->(movie)
    RETURN movie.released, COLLECT(movie.title) AS movies
    ORDER BY movie.released
""")
MATCH (person:Person {name: "Keanu Reeves"})-[:ACTED_IN]->(movie)
RETURN movie.released, COLLECT(movie.title) AS movies
ORDER BY movie.released
query = """
    MATCH (person:Person {name: "Keanu Reeves"})-[:ACTED_IN]->(movie)
    RETURN movie.released, COLLECT(movie.title) AS movies
    ORDER BY movie.released
"""

# Create the driver session
with driver.session() as session:
    # Run the Cypher query
    result = session.run(query).data()

    # Print the formatted result
    print(json.dumps(result, indent=2))

清理

当数据不再有用时,可以清理数据库。

# Delete data
gds.run_cypher("""
    MATCH (n)
    DETACH DELETE n
""")
MATCH (n)
DETACH DELETE n
delete_data = """
    MATCH (n)
    DETACH DELETE n
"""

# Create the driver session
with driver.session() as session:
    # Delete the data
    session.run(delete_data).data()

关闭连接

连接应始终在不再需要时关闭。

虽然 GDS 客户端会在对象被删除时自动关闭连接,但最好显式关闭它。

# Close the client connection
gds.close()
# Close the driver connection
driver.close()