加载 JDBC (RDBMS)
数据集成是一个重要的主题。从关系数据库读取数据以创建和增强数据模型是一个非常有用的练习。
使用 apoc.load.jdbc
,您可以访问任何提供 JDBC 驱动程序的数据库,并执行查询,这些查询的结果将转换为行流。然后,这些行可用于更新或创建图结构。

为了简化 JDBC URL 语法并保护凭据,您可以在 conf/apoc.conf
中配置别名
apoc.jdbc.myDB.url=jdbc:derby:derbyDB
CALL apoc.load.jdbc('jdbc:derby:derbyDB','PERSON')
变为
CALL apoc.load.jdbc('myDB','PERSON')
apoc.jdbc.<alias>.url=
中的第 3 个值有效地定义了一个别名,将在 apoc.load.jdbc('<alias>',….
中使用。
MySQL 示例
Northwind 是关系数据库的常用示例集,在我们的导入指南中也有介绍,例如 Neo4j 浏览器中的 :play northwind graph。
MySQL Northwind 数据
select count(*) from products;
count(*) |
---|
77 |
describe products;
字段 | 类型 | 空 | 键 | 默认值 | 额外 |
---|---|---|---|---|---|
ProductID |
int(11) |
否 |
PRI |
NULL |
auto_increment |
ProductName |
varchar(40) |
否 |
MUL |
NULL |
|
SupplierID |
int(11) |
是 |
MUL |
NULL |
|
CategoryID |
int(11) |
是 |
MUL |
NULL |
|
QuantityPerUnit |
varchar(20) |
是 |
NULL |
||
UnitPrice |
decimal(10,4) |
是 |
0.0000 |
||
UnitsInStock |
smallint(2) |
是 |
0 |
||
UnitsOnOrder |
smallint(2) |
是 |
0 |
||
ReorderLevel |
smallint(2) |
是 |
0 |
||
Discontinued |
bit(1) |
否 |
b'0' |
加载 JDBC 示例
CALL apoc.load.driver("com.mysql.jdbc.Driver");
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN count(*);
count(*) |
---|
77 |
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN row limit 1;
行 |
---|
{UnitPrice → 18.0000, UnitsOnOrder → 0, CategoryID → 1, UnitsInStock → 39} |

在事务批次中加载数据
您可以从 jdbc 加载数据,并使用查询结果以批次(以及并行)方式创建/更新图。
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc("jdbc:mysql://localhost:3306/northwind?user=root","company")',
'CREATE (p:Person) SET p += value',
{ batchSize:10000, parallel:true})
YIELD batches, total
Cassandra 示例
将 Song 数据库设置为初始数据集
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/playlist.cql curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/artists.csv curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/songs.csv $CASSANDRA_HOME/bin/cassandra $CASSANDRA_HOME/bin/cqlsh -f playlist.cql
下载 Cassandra JDBC 包装器,并将其放入 $NEO4J_HOME/plugins
目录中。将此配置选项添加到 $NEO4J_HOME/conf/apoc.conf
以便于与 cassandra 实例交互。
apoc.jdbc.cassandra_songs.url=jdbc:cassandra://localhost:9042/playlist
重新启动服务器。
现在您可以使用以下方法检查 Cassandra 中的数据。
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN count(*);
count(*) |
---|
3605 |
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row.first_letter, row.artist
LIMIT 5;
row.first_letter | row.artist |
---|---|
C |
C.W. Stoneking |
C |
CH2K |
C |
CHARLIE HUNTER WITH LEON PARKER |
C |
Calvin Harris |
C |
Camané |
让我们创建一些图数据,我们来看一下 track_by_artist 表,它包含大约 60k 条记录。
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN count(*);
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row
LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row.track_id, row.track_length_in_seconds, row.track, row.music_file, row.genre, row.artist, row.starred
LIMIT 2;
row.track_id | length | row.track | row.music_file | row.genre | row.artist | row.starred |
---|---|---|---|---|---|---|
c0693b1e-0eaa-4e81-b23f-b083db303842 |
219 |
1913 Massacre |
TRYKHMD128F934154C |
folk |
Woody Guthrie & Jack Elliott |
false |
7d114937-0bc7-41c7-8e0c-94b5654ac77f |
178 |
Alabammy Bound |
TRMQLPV128F934152B |
folk |
Woody Guthrie & Jack Elliott |
false |
让我们创建一些索引和约束,请注意,其他索引和约束将被此操作删除。
CALL apoc.schema.assert(
{Track:['title','length']},
{Artist:['name'],Track:['id'],Genre:['name']});
label | key | unique | action |
---|---|---|---|
Track |
title |
false |
CREATED |
Track |
length |
false |
CREATED |
Artist |
name |
true |
CREATED |
Genre |
name |
true |
CREATED |
Track |
id |
true |
CREATED |
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
MERGE (a:Artist {name:row.artist})
MERGE (g:Genre {name:row.genre})
CREATE (t:Track {id:toString(row.track_id), title:row.track, length:row.track_length_in_seconds})
CREATE (a)-[:PERFORMED]->(t)
CREATE (t)-[:GENRE]->(g);
Added 63213 labels, created 63213 nodes, set 182413 properties, created 119200 relationships, statement executed in 40076 ms.
支持使用 Kerberos 身份验证的 Hive
特别是使用 Kerberos 的 Hive 支持更加复杂。
首先,所需的配置更详细,请确保获取以下信息
-
kerberos 用户名/密码
-
kerberos 领域/KDC
-
hive 主机名 + 端口(10000)
在已知位置创建此 login.conf
文件
KerberosClient { com.sun.security.auth.module.Krb5LoginModule required debug=true debugNative=true; };
将这些选项添加到您的 conf/apoc.conf
中
dbms.jvm.additional=-Djava.security.auth.login.config=/path/to/login.conf dbms.jvm.additional=-Djava.security.auth.login.config.client=KerberosClient dbms.jvm.additional=-Djava.security.krb5.realm=KRB.REALM.COM dbms.jvm.additional=-Djava.security.krb5.kdc=krb-kdc.host.com
与其他 JDBC 驱动程序不同,Hive 带有一堆依赖项,您可以从 Hadoop 提供程序处下载这些依赖项
或从 Maven 中央仓库 获取它们。
版本可能会有所不同,请使用与您的 Hive 驱动程序一起提供的版本。
-
hadoop-common-2.7.3.2.6.1.0-129.jar
-
hive-exec-1.2.1000.2.6.1.0-129.jar
-
hive-jdbc-1.2.1000.2.6.1.0-129.jar
-
hive-metastore-1.2.1000.2.6.1.0-129.jar
-
hive-service-1.2.1000.2.6.1.0-129.jar
-
httpclient-4.4.jar
-
httpcore-4.4.jar
-
libfb303-0.9.2.jar
-
libthrift-0.9.3.jar
现在您可以从 APOC 中使用这样的 JDBC URL。
此 URL 没有换行符,只是因为太长而进行了换行。 |
jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject
然后调用
WITH 'jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject' AS url
CALL apoc.load.jdbc(url,'PRODUCTS')
YIELD row
RETURN row.name, row.price;
您也可以在 conf/apoc.conf
中将其设置为一个键
apoc.jdbc.my-hive.url=jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject
然后使用更简洁的调用
CALL apoc.load.jdbc('my-hive','SELECT * PRODUCTS');
加载 JDBC - 资源
要使用其他 JDBC 驱动程序,请使用这些下载链接和 JDBC URL。将 JDBC 驱动程序放入 $NEO4J_HOME/plugins
目录,并在 $NEO4J_HOME/conf/apoc.conf
中使用 apoc.jdbc.<alias>.url=<jdbc-url>
配置 JDBC-URL。
凭据可以通过两种方式传递
-
到 URL 中
CALL apoc.load.jdbc('jdbc:derby:derbyDB;user=apoc;password=Ap0c!#Db;create=true', 'PERSON')
-
通过配置参数。
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
使用 Simba 驱动程序的 Google BigQuery 需要使用额外的参数“autoCommit”,例如:
CALL apoc.load.jdbc('BigQuery', 'SELECT action_type FROM `patents-public-data.ebi_chembl.action_type` LIMIT 10', [], {autoCommit:true})
数据库 | JDBC-URL | 驱动程序来源 |
---|---|---|
MySQL |
|
|
Postgres |
|
|
Oracle |
|
|
MS SQLServer |
|
|
IBM DB2 |
|
|
Derby |
|
包含在 JDK6-8 中 |
Cassandra |
|
|
SAP Hana |
|
|
Apache Hive(带 Kerberos) |
|
Apache Hive 驱动程序 (Cloudera) (Hortonworks) 有几个 jar 包(hadoop-common-xxx.jar hive-exec-xxx.jar hive-jdbc-xxx.jar hive-metastore-xxx.jar hive-service-xxx.jar httpclient-4.4.jar httpcore-4.4.jar libfb303-0.9.2.jar libthrift-0.9.3.jar) |
Google BigQuery |
|
用于 BigQuery 的 Simba 驱动程序 有几个 jar 包 |
有很多博客文章/示例详细介绍了 apoc.load.jdbc 的用法
加载 JDBC - 更新
jdbcUpdate 用于从具有可选参数的 SQL 语句更新关系数据库
CALL apoc.load.jdbcUpdate(jdbc-url,statement, params, config)
使用此数据集,您可以以两种不同的模式调用过程
MATCH (u:User)-[:BOUGHT]->(p:Product)<-[:BOUGHT]-(o:User)-[:BOUGHT]->(reco)
WHERE u <> o AND NOT (u)-[:BOUGHT]->(reco)
WITH u, reco, count(*) as score
WHERE score > 1000
您可以使用参数调用过程
CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(?,?,?)',[user.id, reco.id, score]);
您可以不使用参数调用过程
CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(user.id, reco.id, score)');
加载 JDBC 格式日期
从 Neo4j 3.4 开始,支持 时间值
如果加载操作返回的 JdbcType 为 TIMESTAMP 或 TIMESTAMP_WITH_TIMEZONE,您可以提供配置参数 timezone,其类型为 java.time.ZoneId
CALL apoc.load.jdbc('key or url','table or statement', config);
配置
配置参数是可选的,默认值为一个空映射。
|
默认值:null |
|
默认值:{} |
示例
CALL apoc.load.jdbc(
'jdbc:derby:derbyDB',
'SELECT * FROM PERSON WHERE NAME = ?',['John'],
{timezone: "Asia/Tokyo"})
2018-10-31T01:32:25.012+09:00[Asia/Tokyo]
CALL apoc.load.jdbcUpdate('jdbc:derby:derbyDB','UPDATE PERSON SET NAME = ? WHERE NAME = ?',['John','John'],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})