加载 JDBC (RDBMS)

如果您想使用 DuckDB,应从Maven Repository下载并导入驱动程序

数据集成是一个重要主题。从关系数据库读取数据以创建和增强数据模型是一项非常有益的工作。

通过 apoc.load.jdbc,您可以访问任何提供 JDBC 驱动程序的数据库,并执行将结果转换为行流的查询。然后可以使用这些行来更新或创建图结构。

apoc jdbc northwind load

为了简化 JDBC URL 语法并保护凭据,您可以在 conf/apoc.conf 中配置别名

apoc.jdbc.myDB.url=jdbc:derby:derbyDB
CALL apoc.load.jdbc('jdbc:derby:derbyDB','PERSON')

变为

CALL apoc.load.jdbc('myDB','PERSON')

apoc.jdbc.<alias>.url= 中的第三个值实际上定义了将在 apoc.load.jdbc('<alias>',…​. 中使用的别名。

MySQL 示例

Northwind 是关系数据库的常见示例集,在我们的导入指南中也有介绍,例如在 Neo4j 浏览器中运行 :play northwind graph

MySQL Northwind 数据

select count(*) from products;
表 1. 结果
count(*)

77

describe products;
表 2. 结果
字段 类型 可空 默认值 额外信息

ProductID

int(11)

NO

PRI

NULL

auto_increment

ProductName

varchar(40)

NO

MUL

NULL

SupplierID

int(11)

MUL

NULL

CategoryID

int(11)

MUL

NULL

QuantityPerUnit

varchar(20)

NULL

UnitPrice

decimal(10,4)

0.0000

UnitsInStock

smallint(2)

0

UnitsOnOrder

smallint(2)

0

ReorderLevel

smallint(2)

0

Discontinued

bit(1)

NO

b'0'

加载 JDBC 示例

加载 JDBC 驱动程序
CALL apoc.load.driver("com.mysql.jdbc.Driver");
计算 products 表中的行数
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN count(*);
表 3. 结果
count(*)

77

返回 products 表中的行
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN row limit 1;
表 4. 结果

{UnitPrice → 18.0000, UnitsOnOrder → 0, CategoryID → 1, UnitsInStock → 39}

apoc load jdbc

以事务批量方式加载数据

您可以从 JDBC 加载数据,并使用批量(和并行)查询结果创建/更新图。

CALL apoc.periodic.iterate(
  'CALL apoc.load.jdbc("jdbc:mysql://localhost:3306/northwind?user=root","company")',
  'CREATE (p:Person) SET p += value',
  { batchSize:10000, parallel:true})
YIELD batches, total

Cassandra 示例

将 Song 数据库设置为初始数据集

curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/playlist.cql
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/artists.csv
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/songs.csv
$CASSANDRA_HOME/bin/cassandra
$CASSANDRA_HOME/bin/cqlsh -f playlist.cql

下载 Cassandra JDBC Wrapper,并将其放入 $NEO4J_HOME/plugins 目录。将此配置选项添加到 $NEO4J_HOME/conf/apoc.conf,以便更轻松地与 Cassandra 实例交互。

添加到 conf/apoc.conf
apoc.jdbc.cassandra_songs.url=jdbc:cassandra://localhost:9042/playlist

重启服务器。

现在您可以使用以下命令检查 Cassandra 中的数据。

CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN count(*);
表 5. 结果
count(*)

3605

CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row.first_letter, row.artist
LIMIT 5;
表 6. 结果
row.first_letter row.artist

C

C.W. Stoneking

C

CH2K

C

CHARLIE HUNTER WITH LEON PARKER

C

Calvin Harris

C

Camané

让我们创建一些图数据,查看一下 track_by_artist 表,它包含大约 60k 条记录。

CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN count(*);
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row
LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row.track_id, row.track_length_in_seconds, row.track, row.music_file, row.genre, row.artist, row.starred
LIMIT 2;
表 7. 结果
row.track_id length row.track row.music_file row.genre row.artist row.starred

c0693b1e-0eaa-4e81-b23f-b083db303842

219

1913 Massacre

TRYKHMD128F934154C

folk

Woody Guthrie & Jack Elliott

false

7d114937-0bc7-41c7-8e0c-94b5654ac77f

178

Alabammy Bound

TRMQLPV128F934152B

folk

Woody Guthrie & Jack Elliott

false

让我们创建一些索引和约束,请注意,这会删除其他索引和约束。

CALL apoc.schema.assert(
  {Track:['title','length']},
  {Artist:['name'],Track:['id'],Genre:['name']});
表 8. 结果
标签 唯一 操作

Track

title

false

CREATED

Track

length

false

CREATED

Artist

name

true

CREATED

Genre

name

true

CREATED

Track

id

true

CREATED

CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
MERGE (a:Artist {name:row.artist})
MERGE (g:Genre {name:row.genre})
CREATE (t:Track {id:toString(row.track_id), title:row.track, length:row.track_length_in_seconds})
CREATE (a)-[:PERFORMED]->(t)
CREATE (t)-[:GENRE]->(g);
Added 63213 labels, created 63213 nodes, set 182413 properties, created 119200 relationships, statement executed in 40076 ms.

支持使用 Kerberos 认证的 Hive

对 Hive 的支持,特别是带有 Kerberos 的 Hive,涉及更多细节。

首先,所需的配置更详细,请确保获取以下信息

  • Kerberos 用户 / 密码

  • Kerberos 域 / kdc

  • Hive 主机名 + 端口 (10000)

在已知位置创建此 login.conf 文件

login.conf
KerberosClient {
  com.sun.security.auth.module.Krb5LoginModule required
  debug=true debugNative=true;
};

将这些选项添加到您的 conf/apoc.conf

apoc.conf
dbms.jvm.additional=-Djava.security.auth.login.config=/path/to/login.conf
dbms.jvm.additional=-Djava.security.auth.login.config.client=KerberosClient
dbms.jvm.additional=-Djava.security.krb5.realm=KRB.REALM.COM
dbms.jvm.additional=-Djava.security.krb5.kdc=krb-kdc.host.com

与其他 JDBC 驱动程序不同,Hive 附带了一堆依赖项,您可以从 Hadoop 提供商处下载这些依赖项

或者从 maven central 获取它们。

版本可能有所不同,请使用您的 Hive 驱动程序自带的版本。

  • hadoop-common-2.7.3.2.6.1.0-129.jar

  • hive-exec-1.2.1000.2.6.1.0-129.jar

  • hive-jdbc-1.2.1000.2.6.1.0-129.jar

  • hive-metastore-1.2.1000.2.6.1.0-129.jar

  • hive-service-1.2.1000.2.6.1.0-129.jar

  • httpclient-4.4.jar

  • httpcore-4.4.jar

  • libfb303-0.9.2.jar

  • libthrift-0.9.3.jar

现在您可以从 APOC 使用这样的 JDBC URL。

这没有换行符,只是因为太长而进行了换行。

jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

然后调用

WITH 'jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject' AS url
CALL apoc.load.jdbc(url,'PRODUCTS')
YIELD row
RETURN row.name, row.price;

您也可以将其作为键设置在您的 conf/apoc.conf

apoc.conf
apoc.jdbc.my-hive.url=jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

然后使用更紧凑的调用

CALL apoc.load.jdbc('my-hive','SELECT * PRODUCTS');

LOAD JDBC - 资源

要使用其他 JDBC 驱动程序,请使用这些下载链接和 JDBC URL。将 JDBC 驱动程序放入 $NEO4J_HOME/plugins 目录,并在 $NEO4J_HOME/conf/apoc.conf 中使用 apoc.jdbc.<alias>.url=<jdbc-url> 配置 JDBC-URL。

凭据可以通过两种方式传递

  • 在 URL 中

CALL apoc.load.jdbc('jdbc:derby:derbyDB;user=apoc;password=Ap0c!#Db;create=true', 'PERSON')
  • 通过配置参数。

CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})

使用 Simba 驱动程序的 Google BigQuery 需要使用额外的参数 'autoCommit',例如

CALL apoc.load.jdbc('BigQuery', 'SELECT action_type FROM `patents-public-data.ebi_chembl.action_type` LIMIT 10', [], {autoCommit:true})
数据库 JDBC-URL 驱动程序来源

MySQL

jdbc:mysql://<hostname>:<port/3306>/<database>?user=<user>&password=<pass>

Postgres

jdbc:postgresql://<hostname>/<database>?user=<user>&password=<pass>

Oracle

jdbc:oracle:thin:<user>/<pass>@<host>:<port>/<service_name>

MS SQLServer

jdbc:sqlserver://;servername=<servername>;databaseName=<database>;user=<user>;password=<pass>

IBM DB2

jdbc:db2://<host>:<port/5021>/<database>:user=<user>;password=<pass>;

Derby

jdbc:derby:derbyDB

包含在 JDK6-8 中

Cassandra

jdbc:cassandra://<host>:<port/9042>/<database>

SAP Hana

jdbc:sap://<host>:<port/39015>/?user=<user>&password=<pass>

Apache Hive (带 Kerberos)

jdbc:hive2://username%40krb-realm:password@hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

Apache Hive 驱动程序 (Cloudera) (Hortonworks) 有几个 jar 包 (hadoop-common-xxx.jar hive-exec-xxx.jar hive-jdbc-xxx.jar hive-metastore-xxx.jar hive-service-xxx.jar httpclient-4.4.jar httpcore-4.4.jar libfb303-0.9.2.jar libthrift-0.9.3.jar)

Google BigQuery

jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<Project ID>;OAuthType=0;OAuthServiceAcctEmail=<Service Account ID>;OAuthPvtKeyPath=/path/to/<Private Key>.json

Simba Drivers for BigQuery 有几个 jar 包

有一些博客文章/示例详细介绍了 apoc.load.jdbc 的用法

LOAD JDBC - UPDATE

jdbcUpdate 用于更新关系数据库,从带有可选参数的 SQL 语句进行更新

CALL apoc.load.jdbcUpdate(jdbc-url,statement, params, config)

使用此数据集,您可以以两种不同模式调用此过程

MATCH (u:User)-[:BOUGHT]->(p:Product)<-[:BOUGHT]-(o:User)-[:BOUGHT]->(reco)
WHERE u <> o AND NOT (u)-[:BOUGHT]->(reco)
WITH u, reco, count(*) as score
WHERE score > 1000

您可以带参数调用此过程

CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(?,?,?)',[user.id, reco.id, score]);

您可以不带参数调用此过程

CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(user.id, reco.id, score)');

加载 JDBC 格式化日期

从 Neo4j 3.4 开始,支持 时间值 (Temporal Values)

如果加载操作返回的 JdbcType 是 TIMESTAMP 或 TIMESTAMP_WITH_TIMEZONE,您可以提供配置参数 timezone,其类型为 java.time.ZoneId

CALL apoc.load.jdbc('key or url','table or statement', config);

配置

Config 参数是可选的,默认值是一个空 map。

timezone

默认值: null

credentials

默认值: {}

示例

带时区
CALL apoc.load.jdbc(
  'jdbc:derby:derbyDB',
  'SELECT * FROM PERSON WHERE NAME = ?',['John'],
  {timezone: "Asia/Tokyo"})
2018-10-31T01:32:25.012+09:00[Asia/Tokyo]
带凭据
CALL apoc.load.jdbcUpdate('jdbc:derby:derbyDB','UPDATE PERSON SET NAME = ? WHERE NAME = ?',['John','John'],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})

JDBC 分析

您可以使用 apoc.jdbc.analytics(<cypherQuery>, <jdbcUrl>, <sqlQueryOverTemporaryTable>, <paramsList>, $config) 从 Cypher 查询创建一个临时表,并将复杂的分析委托给由 JDBC URL 定义的数据库。

请注意,返回的 SQL 列名必须与 Cypher 查询提供的列名一致。

除了 apoc.load.jdbc 过程的配置外,apoc.jdbc.analytics 还提供以下配置

name 描述 默认值

tableName

临时表名称

neo4j_tmp_table

provider

SQL 提供商,根据其处理数据类型,可能的值为 "POSTGRES"、"MYSQL" 和 "DEFAULT"

"DEFAULT"

batchSize

将数据插入 SQL 表的批量大小

10000

writeMode

'CREATE'

如果为 'CREATE',则创建新的临时表。如果为 'APPEND',则重用现有表。

可以在 config 参数中指定提供商。

您可以使用一些节点来重现以下查询

CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2000, population: 1005})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2010, population: 1065})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2020, population: 1158})
CREATE (:City {country: 'US', name: 'Seattle', year: 2000, population: 564})
CREATE (:City {country: 'US', name: 'Seattle', year:2010, population: 608})
CREATE (:City {country: 'US', name: 'Seattle', year: 2020, population: 738})
CREATE (:City {country: 'US', name: 'New York City', year: 2000, population: 8015})
CREATE (:City {country: 'US', name: 'New York City', year: 2010, population: 8175})
CREATE (:City {country: 'US', name: 'New York City', year: 2020, population: 8772})

DuckDB

获取带有空隙的当前行排名的示例。SQL 查询的字段应与 Cypher 查询一致。有关详细信息,请访问 https://duckdb.org/docs/sql/functions/window_functions.html#rank

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  $url,
  "SELECT
     country,
     name,
     year,
     population,
     RANK() OVER (PARTITION BY country ORDER BY year DESC) AS rank
   FROM 'neo4j_tmp_table'
   ORDER BY rank, country, name;"
)

使用窗口函数获取数据透视表的另一个示例

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  $url,
  "WITH ranked_data AS (
        SELECT
            country,
            name,
            year,
            population,
            ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS rank
        FROM 'neo4j_tmp_table'
        ORDER BY rank, country, name
    )
    SELECT *
    FROM ranked_data
    PIVOT (
        sum(population)
        FOR country IN ('NL', 'US')
        GROUP BY year
    )"
)

或直接使用 PIVOT <table> ON <column> 子句

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  $url,
  "PIVOT 'neo4j_tmp_table'
    ON year
    USING sum(population)
    ORDER by name"
)

在 DuckDB 中,我们还可以使用 jdbc:duckdb: URL 使用内存实例

CALL apoc.jdbc.analytics(
  "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
  'jdbc:duckdb:',
  "PIVOT 'neo4j_tmp_table'
    ON year
    USING sum(population)
    ORDER by name"
)

MySQL

返回当前行在其分区内的排名,带有空隙。有关更多信息,请访问 https://dev.mysqlserver.cn/doc/refman/8.4/en/window-function-descriptions.html#function_rank

CALL apoc.jdbc.analytics(
 "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
 $url,
 "SELECT
     country,
     name,
     year,
     population,
     RANK() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
  FROM 'neo4j_tmp_table'
  ORDER BY country, name;",
 $params,
 { provider: "MYSQL" })

这里是 MySQL 中 ROW_NUMBER 窗口函数的一个示例

CALL apoc.jdbc.analytics(
 "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
 $url,
 "SELECT
      country,
      name,
      year,
      population,
      ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
  FROM 'neo4j_tmp_table'
  ORDER BY country, name;",
 $params,
 { provider: "MYSQL" })

PostgreSQL

这里是窗口函数的一个示例。

CALL apoc.jdbc.analytics(
 "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
 $url,
 "SELECT
      country,
      name,
      year,
      population,
      RANK() OVER (PARTITION BY country ORDER BY year DESC) rank
  FROM 'neo4j_tmp_table'
  ORDER BY rank, country, name;",
 $params,
 { provider: "POSTGRES" })
© . All rights reserved.