加载 JDBC (RDBMS)
如果您想使用 DuckDB,应从Maven Repository下载并导入驱动程序
数据集成是一个重要主题。从关系数据库读取数据以创建和增强数据模型是一项非常有益的工作。
通过 apoc.load.jdbc
,您可以访问任何提供 JDBC 驱动程序的数据库,并执行将结果转换为行流的查询。然后可以使用这些行来更新或创建图结构。

为了简化 JDBC URL 语法并保护凭据,您可以在 conf/apoc.conf
中配置别名
apoc.jdbc.myDB.url=jdbc:derby:derbyDB
CALL apoc.load.jdbc('jdbc:derby:derbyDB','PERSON')
变为
CALL apoc.load.jdbc('myDB','PERSON')
apoc.jdbc.<alias>.url=
中的第三个值实际上定义了将在 apoc.load.jdbc('<alias>',….
中使用的别名。
MySQL 示例
Northwind 是关系数据库的常见示例集,在我们的导入指南中也有介绍,例如在 Neo4j 浏览器中运行 :play northwind graph。
MySQL Northwind 数据
select count(*) from products;
count(*) |
---|
77 |
describe products;
字段 | 类型 | 可空 | 键 | 默认值 | 额外信息 |
---|---|---|---|---|---|
ProductID |
int(11) |
NO |
PRI |
NULL |
auto_increment |
ProductName |
varchar(40) |
NO |
MUL |
NULL |
|
SupplierID |
int(11) |
是 |
MUL |
NULL |
|
CategoryID |
int(11) |
是 |
MUL |
NULL |
|
QuantityPerUnit |
varchar(20) |
是 |
NULL |
||
UnitPrice |
decimal(10,4) |
是 |
0.0000 |
||
UnitsInStock |
smallint(2) |
是 |
0 |
||
UnitsOnOrder |
smallint(2) |
是 |
0 |
||
ReorderLevel |
smallint(2) |
是 |
0 |
||
Discontinued |
bit(1) |
NO |
b'0' |
加载 JDBC 示例
CALL apoc.load.driver("com.mysql.jdbc.Driver");
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN count(*);
count(*) |
---|
77 |
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN row limit 1;
行 |
---|
{UnitPrice → 18.0000, UnitsOnOrder → 0, CategoryID → 1, UnitsInStock → 39} |

以事务批量方式加载数据
您可以从 JDBC 加载数据,并使用批量(和并行)查询结果创建/更新图。
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc("jdbc:mysql://localhost:3306/northwind?user=root","company")',
'CREATE (p:Person) SET p += value',
{ batchSize:10000, parallel:true})
YIELD batches, total
Cassandra 示例
将 Song 数据库设置为初始数据集
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/playlist.cql curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/artists.csv curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/songs.csv $CASSANDRA_HOME/bin/cassandra $CASSANDRA_HOME/bin/cqlsh -f playlist.cql
下载 Cassandra JDBC Wrapper,并将其放入 $NEO4J_HOME/plugins
目录。将此配置选项添加到 $NEO4J_HOME/conf/apoc.conf
,以便更轻松地与 Cassandra 实例交互。
apoc.jdbc.cassandra_songs.url=jdbc:cassandra://localhost:9042/playlist
重启服务器。
现在您可以使用以下命令检查 Cassandra 中的数据。
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN count(*);
count(*) |
---|
3605 |
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row.first_letter, row.artist
LIMIT 5;
row.first_letter | row.artist |
---|---|
C |
C.W. Stoneking |
C |
CH2K |
C |
CHARLIE HUNTER WITH LEON PARKER |
C |
Calvin Harris |
C |
Camané |
让我们创建一些图数据,查看一下 track_by_artist 表,它包含大约 60k 条记录。
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN count(*);
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row
LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row.track_id, row.track_length_in_seconds, row.track, row.music_file, row.genre, row.artist, row.starred
LIMIT 2;
row.track_id | length | row.track | row.music_file | row.genre | row.artist | row.starred |
---|---|---|---|---|---|---|
c0693b1e-0eaa-4e81-b23f-b083db303842 |
219 |
1913 Massacre |
TRYKHMD128F934154C |
folk |
Woody Guthrie & Jack Elliott |
false |
7d114937-0bc7-41c7-8e0c-94b5654ac77f |
178 |
Alabammy Bound |
TRMQLPV128F934152B |
folk |
Woody Guthrie & Jack Elliott |
false |
让我们创建一些索引和约束,请注意,这会删除其他索引和约束。
CALL apoc.schema.assert(
{Track:['title','length']},
{Artist:['name'],Track:['id'],Genre:['name']});
标签 | 键 | 唯一 | 操作 |
---|---|---|---|
Track |
title |
false |
CREATED |
Track |
length |
false |
CREATED |
Artist |
name |
true |
CREATED |
Genre |
name |
true |
CREATED |
Track |
id |
true |
CREATED |
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
MERGE (a:Artist {name:row.artist})
MERGE (g:Genre {name:row.genre})
CREATE (t:Track {id:toString(row.track_id), title:row.track, length:row.track_length_in_seconds})
CREATE (a)-[:PERFORMED]->(t)
CREATE (t)-[:GENRE]->(g);
Added 63213 labels, created 63213 nodes, set 182413 properties, created 119200 relationships, statement executed in 40076 ms.
支持使用 Kerberos 认证的 Hive
对 Hive 的支持,特别是带有 Kerberos 的 Hive,涉及更多细节。
首先,所需的配置更详细,请确保获取以下信息
-
Kerberos 用户 / 密码
-
Kerberos 域 / kdc
-
Hive 主机名 + 端口 (10000)
在已知位置创建此 login.conf
文件
KerberosClient { com.sun.security.auth.module.Krb5LoginModule required debug=true debugNative=true; };
将这些选项添加到您的 conf/apoc.conf
中
dbms.jvm.additional=-Djava.security.auth.login.config=/path/to/login.conf dbms.jvm.additional=-Djava.security.auth.login.config.client=KerberosClient dbms.jvm.additional=-Djava.security.krb5.realm=KRB.REALM.COM dbms.jvm.additional=-Djava.security.krb5.kdc=krb-kdc.host.com
与其他 JDBC 驱动程序不同,Hive 附带了一堆依赖项,您可以从 Hadoop 提供商处下载这些依赖项
或者从 maven central 获取它们。
版本可能有所不同,请使用您的 Hive 驱动程序自带的版本。
-
hadoop-common-2.7.3.2.6.1.0-129.jar
-
hive-exec-1.2.1000.2.6.1.0-129.jar
-
hive-jdbc-1.2.1000.2.6.1.0-129.jar
-
hive-metastore-1.2.1000.2.6.1.0-129.jar
-
hive-service-1.2.1000.2.6.1.0-129.jar
-
httpclient-4.4.jar
-
httpcore-4.4.jar
-
libfb303-0.9.2.jar
-
libthrift-0.9.3.jar
现在您可以从 APOC 使用这样的 JDBC URL。
这没有换行符,只是因为太长而进行了换行。 |
jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject
然后调用
WITH 'jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject' AS url
CALL apoc.load.jdbc(url,'PRODUCTS')
YIELD row
RETURN row.name, row.price;
您也可以将其作为键设置在您的 conf/apoc.conf
中
apoc.jdbc.my-hive.url=jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject
然后使用更紧凑的调用
CALL apoc.load.jdbc('my-hive','SELECT * PRODUCTS');
LOAD JDBC - 资源
要使用其他 JDBC 驱动程序,请使用这些下载链接和 JDBC URL。将 JDBC 驱动程序放入 $NEO4J_HOME/plugins
目录,并在 $NEO4J_HOME/conf/apoc.conf
中使用 apoc.jdbc.<alias>.url=<jdbc-url>
配置 JDBC-URL。
凭据可以通过两种方式传递
-
在 URL 中
CALL apoc.load.jdbc('jdbc:derby:derbyDB;user=apoc;password=Ap0c!#Db;create=true', 'PERSON')
-
通过配置参数。
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
使用 Simba 驱动程序的 Google BigQuery 需要使用额外的参数 'autoCommit',例如
CALL apoc.load.jdbc('BigQuery', 'SELECT action_type FROM `patents-public-data.ebi_chembl.action_type` LIMIT 10', [], {autoCommit:true})
数据库 | JDBC-URL | 驱动程序来源 |
---|---|---|
MySQL |
|
|
Postgres |
|
|
Oracle |
|
|
MS SQLServer |
|
|
IBM DB2 |
|
|
Derby |
|
包含在 JDK6-8 中 |
Cassandra |
|
|
SAP Hana |
|
|
Apache Hive (带 Kerberos) |
|
Apache Hive 驱动程序 (Cloudera) (Hortonworks) 有几个 jar 包 (hadoop-common-xxx.jar hive-exec-xxx.jar hive-jdbc-xxx.jar hive-metastore-xxx.jar hive-service-xxx.jar httpclient-4.4.jar httpcore-4.4.jar libfb303-0.9.2.jar libthrift-0.9.3.jar) |
Google BigQuery |
|
Simba Drivers for BigQuery 有几个 jar 包 |
有一些博客文章/示例详细介绍了 apoc.load.jdbc 的用法
LOAD JDBC - UPDATE
jdbcUpdate 用于更新关系数据库,从带有可选参数的 SQL 语句进行更新
CALL apoc.load.jdbcUpdate(jdbc-url,statement, params, config)
使用此数据集,您可以以两种不同模式调用此过程
MATCH (u:User)-[:BOUGHT]->(p:Product)<-[:BOUGHT]-(o:User)-[:BOUGHT]->(reco)
WHERE u <> o AND NOT (u)-[:BOUGHT]->(reco)
WITH u, reco, count(*) as score
WHERE score > 1000
您可以带参数调用此过程
CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(?,?,?)',[user.id, reco.id, score]);
您可以不带参数调用此过程
CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(user.id, reco.id, score)');
加载 JDBC 格式化日期
从 Neo4j 3.4 开始,支持 时间值 (Temporal Values)
如果加载操作返回的 JdbcType 是 TIMESTAMP 或 TIMESTAMP_WITH_TIMEZONE,您可以提供配置参数 timezone,其类型为 java.time.ZoneId
CALL apoc.load.jdbc('key or url','table or statement', config);
配置
Config 参数是可选的,默认值是一个空 map。
|
默认值: null |
|
默认值: {} |
示例
CALL apoc.load.jdbc(
'jdbc:derby:derbyDB',
'SELECT * FROM PERSON WHERE NAME = ?',['John'],
{timezone: "Asia/Tokyo"})
2018-10-31T01:32:25.012+09:00[Asia/Tokyo]
CALL apoc.load.jdbcUpdate('jdbc:derby:derbyDB','UPDATE PERSON SET NAME = ? WHERE NAME = ?',['John','John'],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
JDBC 分析
您可以使用 apoc.jdbc.analytics(<cypherQuery>, <jdbcUrl>, <sqlQueryOverTemporaryTable>, <paramsList>, $config)
从 Cypher 查询创建一个临时表,并将复杂的分析委托给由 JDBC URL 定义的数据库。
请注意,返回的 SQL 列名必须与 Cypher 查询提供的列名一致。
除了 apoc.load.jdbc
过程的配置外,apoc.jdbc.analytics
还提供以下配置
name | 描述 | 默认值 |
---|---|---|
|
临时表名称 |
neo4j_tmp_table |
|
SQL 提供商,根据其处理数据类型,可能的值为 "POSTGRES"、"MYSQL" 和 "DEFAULT" |
"DEFAULT" |
|
将数据插入 SQL 表的批量大小 |
10000 |
|
'CREATE' |
如果为 'CREATE',则创建新的临时表。如果为 'APPEND',则重用现有表。 |
可以在 config 参数中指定提供商。
您可以使用一些节点来重现以下查询
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2000, population: 1005})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2010, population: 1065})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2020, population: 1158})
CREATE (:City {country: 'US', name: 'Seattle', year: 2000, population: 564})
CREATE (:City {country: 'US', name: 'Seattle', year:2010, population: 608})
CREATE (:City {country: 'US', name: 'Seattle', year: 2020, population: 738})
CREATE (:City {country: 'US', name: 'New York City', year: 2000, population: 8015})
CREATE (:City {country: 'US', name: 'New York City', year: 2010, population: 8175})
CREATE (:City {country: 'US', name: 'New York City', year: 2020, population: 8772})
DuckDB
获取带有空隙的当前行排名的示例。SQL 查询的字段应与 Cypher 查询一致。有关详细信息,请访问 https://duckdb.org/docs/sql/functions/window_functions.html#rank
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) AS rank
FROM 'neo4j_tmp_table'
ORDER BY rank, country, name;"
)
使用窗口函数获取数据透视表的另一个示例
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"WITH ranked_data AS (
SELECT
country,
name,
year,
population,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS rank
FROM 'neo4j_tmp_table'
ORDER BY rank, country, name
)
SELECT *
FROM ranked_data
PIVOT (
sum(population)
FOR country IN ('NL', 'US')
GROUP BY year
)"
)
或直接使用 PIVOT <table> ON <column>
子句
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"PIVOT 'neo4j_tmp_table'
ON year
USING sum(population)
ORDER by name"
)
在 DuckDB 中,我们还可以使用 jdbc:duckdb:
URL 使用内存实例
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
'jdbc:duckdb:',
"PIVOT 'neo4j_tmp_table'
ON year
USING sum(population)
ORDER by name"
)
MySQL
返回当前行在其分区内的排名,带有空隙。有关更多信息,请访问 https://dev.mysqlserver.cn/doc/refman/8.4/en/window-function-descriptions.html#function_rank
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
FROM 'neo4j_tmp_table'
ORDER BY country, name;",
$params,
{ provider: "MYSQL" })
这里是 MySQL 中 ROW_NUMBER 窗口函数的一个示例
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
FROM 'neo4j_tmp_table'
ORDER BY country, name;",
$params,
{ provider: "MYSQL" })
PostgreSQL
这里是窗口函数的一个示例。
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) rank
FROM 'neo4j_tmp_table'
ORDER BY rank, country, name;",
$params,
{ provider: "POSTGRES" })