加载 JDBC (RDBMS)

数据集成是一个重要的主题。从关系数据库读取数据以创建和增强数据模型是一个非常有用的练习。

使用 apoc.load.jdbc,您可以访问任何提供 JDBC 驱动程序的数据库,并执行查询,这些查询的结果将转换为行流。然后,这些行可用于更新或创建图结构。

apoc jdbc northwind load

为了简化 JDBC URL 语法并保护凭据,您可以在 conf/apoc.conf 中配置别名

apoc.jdbc.myDB.url=jdbc:derby:derbyDB
CALL apoc.load.jdbc('jdbc:derby:derbyDB','PERSON')

变为

CALL apoc.load.jdbc('myDB','PERSON')

apoc.jdbc.<alias>.url= 中的第 3 个值有效地定义了一个别名,将在 apoc.load.jdbc('<alias>',…​. 中使用。

MySQL 示例

Northwind 是关系数据库的常用示例集,在我们的导入指南中也有介绍,例如 Neo4j 浏览器中的 :play northwind graph

MySQL Northwind 数据

select count(*) from products;
表 1. 结果
count(*)

77

describe products;
表 2. 结果
字段 类型 默认值 额外

ProductID

int(11)

PRI

NULL

auto_increment

ProductName

varchar(40)

MUL

NULL

SupplierID

int(11)

MUL

NULL

CategoryID

int(11)

MUL

NULL

QuantityPerUnit

varchar(20)

NULL

UnitPrice

decimal(10,4)

0.0000

UnitsInStock

smallint(2)

0

UnitsOnOrder

smallint(2)

0

ReorderLevel

smallint(2)

0

Discontinued

bit(1)

b'0'

加载 JDBC 示例

加载 JDBC 驱动程序
CALL apoc.load.driver("com.mysql.jdbc.Driver");
计算 products 表中的行数
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN count(*);
表 3. 结果
count(*)

77

从 products 表返回行
WITH "jdbc:mysql://localhost:3306/northwind?user=root" as url
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN row limit 1;
表 4. 结果

{UnitPrice → 18.0000, UnitsOnOrder → 0, CategoryID → 1, UnitsInStock → 39}

apoc load jdbc

在事务批次中加载数据

您可以从 jdbc 加载数据,并使用查询结果以批次(以及并行)方式创建/更新图。

CALL apoc.periodic.iterate(
  'CALL apoc.load.jdbc("jdbc:mysql://localhost:3306/northwind?user=root","company")',
  'CREATE (p:Person) SET p += value',
  { batchSize:10000, parallel:true})
YIELD batches, total

Cassandra 示例

将 Song 数据库设置为初始数据集

curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/playlist.cql
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/artists.csv
curl -OL https://raw.githubusercontent.com/neo4j-contrib/neo4j-cassandra-connector/master/db_gen/songs.csv
$CASSANDRA_HOME/bin/cassandra
$CASSANDRA_HOME/bin/cqlsh -f playlist.cql

下载 Cassandra JDBC 包装器,并将其放入 $NEO4J_HOME/plugins 目录中。将此配置选项添加到 $NEO4J_HOME/conf/apoc.conf 以便于与 cassandra 实例交互。

添加到 conf/apoc.conf
apoc.jdbc.cassandra_songs.url=jdbc:cassandra://localhost:9042/playlist

重新启动服务器。

现在您可以使用以下方法检查 Cassandra 中的数据。

CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN count(*);
表 5. 结果
count(*)

3605

CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','artists_by_first_letter')
YIELD row
RETURN row.first_letter, row.artist
LIMIT 5;
表 6. 结果
row.first_letter row.artist

C

C.W. Stoneking

C

CH2K

C

CHARLIE HUNTER WITH LEON PARKER

C

Calvin Harris

C

Camané

让我们创建一些图数据,我们来看一下 track_by_artist 表,它包含大约 60k 条记录。

CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN count(*);
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row
LIMIT 5;
CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
RETURN row.track_id, row.track_length_in_seconds, row.track, row.music_file, row.genre, row.artist, row.starred
LIMIT 2;
表 7. 结果
row.track_id length row.track row.music_file row.genre row.artist row.starred

c0693b1e-0eaa-4e81-b23f-b083db303842

219

1913 Massacre

TRYKHMD128F934154C

folk

Woody Guthrie & Jack Elliott

false

7d114937-0bc7-41c7-8e0c-94b5654ac77f

178

Alabammy Bound

TRMQLPV128F934152B

folk

Woody Guthrie & Jack Elliott

false

让我们创建一些索引和约束,请注意,其他索引和约束将被此操作删除。

CALL apoc.schema.assert(
  {Track:['title','length']},
  {Artist:['name'],Track:['id'],Genre:['name']});
表 8. 结果
label key unique action

Track

title

false

CREATED

Track

length

false

CREATED

Artist

name

true

CREATED

Genre

name

true

CREATED

Track

id

true

CREATED

CALL apoc.load.jdbc('cassandra_songs','track_by_artist')
YIELD row
MERGE (a:Artist {name:row.artist})
MERGE (g:Genre {name:row.genre})
CREATE (t:Track {id:toString(row.track_id), title:row.track, length:row.track_length_in_seconds})
CREATE (a)-[:PERFORMED]->(t)
CREATE (t)-[:GENRE]->(g);
Added 63213 labels, created 63213 nodes, set 182413 properties, created 119200 relationships, statement executed in 40076 ms.

支持使用 Kerberos 身份验证的 Hive

特别是使用 Kerberos 的 Hive 支持更加复杂。

首先,所需的配置更详细,请确保获取以下信息

  • kerberos 用户名/密码

  • kerberos 领域/KDC

  • hive 主机名 + 端口(10000)

在已知位置创建此 login.conf 文件

login.conf
KerberosClient {
  com.sun.security.auth.module.Krb5LoginModule required
  debug=true debugNative=true;
};

将这些选项添加到您的 conf/apoc.conf

apoc.conf
dbms.jvm.additional=-Djava.security.auth.login.config=/path/to/login.conf
dbms.jvm.additional=-Djava.security.auth.login.config.client=KerberosClient
dbms.jvm.additional=-Djava.security.krb5.realm=KRB.REALM.COM
dbms.jvm.additional=-Djava.security.krb5.kdc=krb-kdc.host.com

与其他 JDBC 驱动程序不同,Hive 带有一堆依赖项,您可以从 Hadoop 提供程序处下载这些依赖项

或从 Maven 中央仓库 获取它们。

版本可能会有所不同,请使用与您的 Hive 驱动程序一起提供的版本。

  • hadoop-common-2.7.3.2.6.1.0-129.jar

  • hive-exec-1.2.1000.2.6.1.0-129.jar

  • hive-jdbc-1.2.1000.2.6.1.0-129.jar

  • hive-metastore-1.2.1000.2.6.1.0-129.jar

  • hive-service-1.2.1000.2.6.1.0-129.jar

  • httpclient-4.4.jar

  • httpcore-4.4.jar

  • libfb303-0.9.2.jar

  • libthrift-0.9.3.jar

现在您可以从 APOC 中使用这样的 JDBC URL。

此 URL 没有换行符,只是因为太长而进行了换行。

jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

然后调用

WITH 'jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject' AS url
CALL apoc.load.jdbc(url,'PRODUCTS')
YIELD row
RETURN row.name, row.price;

您也可以在 conf/apoc.conf 中将其设置为一个键

apoc.conf
apoc.jdbc.my-hive.url=jdbc:hive2://username%40krb-realm:password@hive-hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

然后使用更简洁的调用

CALL apoc.load.jdbc('my-hive','SELECT * PRODUCTS');

加载 JDBC - 资源

要使用其他 JDBC 驱动程序,请使用这些下载链接和 JDBC URL。将 JDBC 驱动程序放入 $NEO4J_HOME/plugins 目录,并在 $NEO4J_HOME/conf/apoc.conf 中使用 apoc.jdbc.<alias>.url=<jdbc-url> 配置 JDBC-URL。

凭据可以通过两种方式传递

  • 到 URL 中

CALL apoc.load.jdbc('jdbc:derby:derbyDB;user=apoc;password=Ap0c!#Db;create=true', 'PERSON')
  • 通过配置参数。

CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})

使用 Simba 驱动程序的 Google BigQuery 需要使用额外的参数“autoCommit”,例如:

CALL apoc.load.jdbc('BigQuery', 'SELECT action_type FROM `patents-public-data.ebi_chembl.action_type` LIMIT 10', [], {autoCommit:true})
数据库 JDBC-URL 驱动程序来源

MySQL

jdbc:mysql://<hostname>:<port/3306>/<database>?user=<user>&password=<pass>

Postgres

jdbc:postgresql://<hostname>/<database>?user=<user>&password=<pass>

Oracle

jdbc:oracle:thin:<user>/<pass>@<host>:<port>/<service_name>

MS SQLServer

jdbc:sqlserver://;servername=<servername>;databaseName=<database>;user=<user>;password=<pass>

IBM DB2

jdbc:db2://<host>:<port/5021>/<database>:user=<user>;password=<pass>;

Derby

jdbc:derby:derbyDB

包含在 JDK6-8 中

Cassandra

jdbc:cassandra://<host>:<port/9042>/<database>

SAP Hana

jdbc:sap://<host>:<port/39015>/?user=<user>&password=<pass>

Apache Hive(带 Kerberos)

jdbc:hive2://username%40krb-realm:password@hostname:10000/default;principal=hive/hostname@krb-realm;auth=kerberos;kerberosAuthType=fromSubject

Apache Hive 驱动程序 (Cloudera) (Hortonworks) 有几个 jar 包(hadoop-common-xxx.jar hive-exec-xxx.jar hive-jdbc-xxx.jar hive-metastore-xxx.jar hive-service-xxx.jar httpclient-4.4.jar httpcore-4.4.jar libfb303-0.9.2.jar libthrift-0.9.3.jar)

Google BigQuery

jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<Project ID>;OAuthType=0;OAuthServiceAcctEmail=<Service Account ID>;OAuthPvtKeyPath=/path/to/<Private Key>.json

有很多博客文章/示例详细介绍了 apoc.load.jdbc 的用法

加载 JDBC - 更新

jdbcUpdate 用于从具有可选参数的 SQL 语句更新关系数据库

CALL apoc.load.jdbcUpdate(jdbc-url,statement, params, config)

使用此数据集,您可以以两种不同的模式调用过程

MATCH (u:User)-[:BOUGHT]->(p:Product)<-[:BOUGHT]-(o:User)-[:BOUGHT]->(reco)
WHERE u <> o AND NOT (u)-[:BOUGHT]->(reco)
WITH u, reco, count(*) as score
WHERE score > 1000

您可以使用参数调用过程

CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(?,?,?)',[user.id, reco.id, score]);

您可以不使用参数调用过程

CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(user.id, reco.id, score)');

加载 JDBC 格式日期

从 Neo4j 3.4 开始,支持 时间值

如果加载操作返回的 JdbcType 为 TIMESTAMP 或 TIMESTAMP_WITH_TIMEZONE,您可以提供配置参数 timezone,其类型为 java.time.ZoneId

CALL apoc.load.jdbc('key or url','table or statement', config);

配置

配置参数是可选的,默认值为一个空映射。

timezone

默认值:null

credentials

默认值:{}

示例

带时区
CALL apoc.load.jdbc(
  'jdbc:derby:derbyDB',
  'SELECT * FROM PERSON WHERE NAME = ?',['John'],
  {timezone: "Asia/Tokyo"})
2018-10-31T01:32:25.012+09:00[Asia/Tokyo]
带凭据
CALL apoc.load.jdbcUpdate('jdbc:derby:derbyDB','UPDATE PERSON SET NAME = ? WHERE NAME = ?',['John','John'],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})