apoc.load.directory.async 过程用于管理触发器。每个触发器都包含一个监听器,该监听器观察一个或多个文件夹,这些文件夹将触发自定义 Cypher 查询的执行。以下过程可用于添加、删除和列出触发器

限定名称 类型 版本

apoc.load.directory.async.add

apoc.load.directory.async.add(name, cypher, pattern, urlDir, {}) YIELD name, status, pattern, cypher, urlDir, config, error - 添加或替换具有特定名称的文件夹监听器,该监听器被触发以针对具有给定模式的所有文件,并在触发时执行指定的 Cypher 查询。返回所有监听器的列表。可以在 config 参数中指定事件类型。

过程

APOC 扩展

apoc.load.directory.async.remove

apoc.load.directory.async.remove(name) YIELD name, status, pattern, cypher, urlDir, config, error - 按名称删除文件夹监听器,并返回所有剩余的监听器(如果有)

过程

APOC 扩展

apoc.load.directory.async.removeAll

apoc.load.directory.async.removeAll() - 删除所有文件夹监听器

过程

APOC 扩展

apoc.load.directory.async.list

apoc.load.directory.async.list() YIELD name, status, pattern, cypher, urlDir, config, error - 列出所有文件夹监听器

过程

APOC 扩展

使用示例

添加文件夹监听器

与其他加载过程(如 apoc.load.csvapoc.load.json)一起,可以异步加载文件。

此示例使用触发器提供的参数创建节点,仅在 csvFolder 中的文件创建和修改时触发

CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})

其中 $fileName 是创建/修改的文件,$filePath 是文件的相对路径,即 $IMPORT_DIR/csvFolder/[FILENAME.csv]$fileDirectory 是目录的相对路径,即 $IMPORT_DIR/csvFolder$listenEventType 是触发事件,即 CREATEMODIFY

假设我们的 IMPORT_DIR 设置为 import,并将以下文件上传到 import/csvFolder 文件夹

test.csv
name,age
Selma,8
Rana,11
Selina,18

然后,执行 MATCH (n:CsvToNode) RETURN properties(n) as props

表 1. 结果
props

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] }

如果我们修改 test.csv 如下

test.csv
name,age
Selma,80
Rana,110
Selina,180

我们将获得 3 个新的节点,其属性如下

表 2. 结果
props

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] }

列出和删除过程

我们可以看到正在执行的监听器列表

CALL apoc.load.directory.async.list();

例如,如果我们执行

CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})

结果将是

表 3. 结果
名称 状态 模式 密码 urlDir 配置 错误

"csvImport"

"正在运行"

"*.csv"

"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})"

<importUrlDir>/csvFolder

{"listenEventType": ["CREATE", "MODIFY"], "interval": 1000 }

""

我们可以使用给定名称作为参数的删除过程来删除特定监听器

CALL apoc.load.directory.async.remove('csvImport')

结果将是剩余监听器的列表。

此外,我们可以使用以下过程删除所有内容(这将返回一个空结果)

CALL apoc.load.directory.async.removeAll()

错误处理

当监听器由于某种原因发生故障时,其status字段将从RUNNING更改为ERROR,并输出相关错误。如果我们执行call apoc.load.directory.async.list,我们会得到以下示例

名称 状态 模式 密码 urlDir 配置 错误

监听器名称

错误

"*.csv"

'create (n:Node)'

'path'

{}

"org.neo4j.graphdb.QueryExecutionException: 无法调用过程 apoc.load.csv:原因:java.io.FileNotFoundException …​.

配置

请注意,要使用apoc.load.directory.async.*过程,需要启用以下配置

apoc.conf
apoc.import.file.enabled=true

以下设置将允许您更改导入文件夹

dbms.directories.import=import

可以将apoc.import.file.use_neo4j_config=false设置为在绝对路径中搜索文件

CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');