目录监听器
apoc.load.directory.async 过程用于管理触发器。每个触发器都包含一个监听器,用于监视一个或多个文件夹,从而触发自定义 cypher 查询的执行。可以使用以下过程来添加、移除和列出触发器
限定名称 | 类型 | 发布版本 |
---|---|---|
|
|
|
apoc.load.directory.async.remove
|
|
|
apoc.load.directory.async.removeAll
|
|
|
apoc.load.directory.async.list
|
|
|
使用示例
添加文件夹监听器
结合其他加载过程,例如 apoc.load.csv
或 apoc.load.json
,可以异步加载文件。
此示例创建节点,使用触发器提供的参数,仅在 csvFolder 中创建和修改文件时触发
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
其中 $fileName
是创建/修改的文件,$filePath
是文件的相对路径,即 $IMPORT_DIR/csvFolder/[FILENAME.csv]
,$fileDirectory
是目录的相对路径,即 $IMPORT_DIR/csvFolder
,而 $listenEventType
是触发的事件,即 CREATE
或 MODIFY
。
假设我们的 IMPORT_DIR 设置为 import
,并且以下文件上传到 import/csvFolder
文件夹
name,age
Selma,8
Rana,11
Selina,18
然后,执行 MATCH (n:CsvToNode) RETURN properties(n) as props
props |
---|
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] } |
如果我们按如下方式修改 test.csv
name,age
Selma,80
Rana,110
Selina,180
我们获得 3 个具有这些属性的新节点
props |
---|
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] } |
列表和移除过程
我们可以通过执行以下操作查看监听器列表
CALL apoc.load.directory.async.list();
例如,如果执行一个
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
结果将是
name | status | pattern | cypher | urlDir | config | error |
---|---|---|---|---|---|---|
"csvImport" |
"RUNNING" |
"*.csv" |
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})" |
<importUrlDir>/csvFolder |
{"listenEventType": ["CREATE", "MODIFY"], "interval": 1000 } |
"" |
我们可以通过将给定名称作为参数执行移除过程来移除特定的监听器
CALL apoc.load.directory.async.remove('csvImport')
结果将是剩余的监听器列表。
此外,我们可以使用以下过程移除所有监听器(这将返回一个空结果)
CALL apoc.load.directory.async.removeAll()
错误处理
当由于某种原因监听器失败时,其 status
字段从 RUNNING
变为 ERROR
,并输出相关错误。如果我们执行 call apoc.load.directory.async.list
,例如,我们会得到
name | status | pattern | cypher | urlDir | config | error |
---|---|---|---|---|---|---|
|
|
|
|
|
|
|
配置
请注意,要使用 apoc.load.directory.async.*
过程,需要启用以下配置
apoc.import.file.enabled=true
以下设置将允许您更改导入文件夹
dbms.directories.import=import
可以将 apoc.import.file.use_neo4j_config=false
设置为在绝对路径中搜索文件
CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');