目录监听器

apoc.load.directory.async 过程用于管理触发器。每个触发器都包含一个监听器,用于监视一个或多个文件夹,从而触发自定义 cypher 查询的执行。可以使用以下过程来添加、移除和列出触发器

限定名称 类型 发布版本

apoc.load.directory.async.add

apoc.load.directory.async.add(name, cypher, pattern, urlDir, {}) YIELD name, status, pattern, cypher, urlDir, config, error - 添加或替换具有特定名称的文件夹监听器,该监听器针对具有给定模式的所有文件触发,并在触发时执行指定的 Cypher 查询。返回所有监听器的列表。可以在 config 参数中指定事件类型。

过程

Apoc Extended

apoc.load.directory.async.remove

apoc.load.directory.async.remove(name) YIELD name, status, pattern, cypher, urlDir, config, error - 按名称移除文件夹监听器,并返回所有剩余的监听器(如果有)

过程

Apoc Extended

apoc.load.directory.async.removeAll

apoc.load.directory.async.removeAll() - 移除所有文件夹监听器

过程

Apoc Extended

apoc.load.directory.async.list

apoc.load.directory.async.list() YIELD name, status, pattern, cypher, urlDir, config, error - 列出所有文件夹监听器

过程

Apoc Extended

使用示例

添加文件夹监听器

结合其他加载过程,例如 apoc.load.csvapoc.load.json,可以异步加载文件。

此示例创建节点,使用触发器提供的参数,仅在 csvFolder 中创建和修改文件时触发

CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})

其中 $fileName 是创建/修改的文件,$filePath 是文件的相对路径,即 $IMPORT_DIR/csvFolder/[FILENAME.csv]$fileDirectory 是目录的相对路径,即 $IMPORT_DIR/csvFolder,而 $listenEventType 是触发的事件,即 CREATEMODIFY

假设我们的 IMPORT_DIR 设置为 import,并且以下文件上传到 import/csvFolder 文件夹

test.csv
name,age
Selma,8
Rana,11
Selina,18

然后,执行 MATCH (n:CsvToNode) RETURN properties(n) as props

表 1. 结果
props

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] }

如果我们按如下方式修改 test.csv

test.csv
name,age
Selma,80
Rana,110
Selina,180

我们获得 3 个具有这些属性的新节点

表 2. 结果
props

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] }

列表和移除过程

我们可以通过执行以下操作查看监听器列表

CALL apoc.load.directory.async.list();

例如,如果执行一个

CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})

结果将是

表 3. 结果
name status pattern cypher urlDir config error

"csvImport"

"RUNNING"

"*.csv"

"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})"

<importUrlDir>/csvFolder

{"listenEventType": ["CREATE", "MODIFY"], "interval": 1000 }

""

我们可以通过将给定名称作为参数执行移除过程来移除特定的监听器

CALL apoc.load.directory.async.remove('csvImport')

结果将是剩余的监听器列表。

此外,我们可以使用以下过程移除所有监听器(这将返回一个空结果)

CALL apoc.load.directory.async.removeAll()

错误处理

当由于某种原因监听器失败时,其 status 字段从 RUNNING 变为 ERROR,并输出相关错误。如果我们执行 call apoc.load.directory.async.list,例如,我们会得到

name status pattern cypher urlDir config error

listenerName

ERROR

*.csv

'create (n:Node)'

'path'

{}

"org.neo4j.graphdb.QueryExecutionException: Failed to invoke procedure apoc.load.csv: Caused by: java.io.FileNotFoundException …​.

配置

请注意,要使用 apoc.load.directory.async.* 过程,需要启用以下配置

apoc.conf
apoc.import.file.enabled=true

以下设置将允许您更改导入文件夹

dbms.directories.import=import

可以将 apoc.import.file.use_neo4j_config=false 设置为在绝对路径中搜索文件

CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');
© . All rights reserved.