ElasticSearch

限定名称 类型 版本

apoc.es.stats

apoc.es.stats(host-or-key,$config) - elastic search 统计信息

过程

Apoc 扩展

apoc.es.get

apoc.es.get(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - 对 elastic search 执行 GET 操作

过程

Apoc 扩展

apoc.es.query

apoc.es.query(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - 对 elastic search 执行 SEARCH 操作

过程

Apoc 扩展

apoc.es.getRaw

apoc.es.getRaw(host-or-key,path,payload-or-null,$config) yield value - 对 elastic search 执行原始 GET 操作

过程

Apoc 扩展

apoc.es.postRaw

apoc.es.postRaw(host-or-key,path,payload-or-null,$config) yield value - 对 elastic search 执行原始 POST 操作

过程

Apoc 扩展

apoc.es.post

apoc.es.post(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - 对 elastic search 执行 POST 操作

过程

Apoc 扩展

apoc.es.put

apoc.es.put(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - 对 elastic search 执行 PUT 操作

过程

Apoc 扩展

apoc.es.delete

apoc.es.delete(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,$config) yield value - 对 elastic search 执行 DELETE 操作

过程

Apoc 扩展

目前无法通过证书查询 Elastic 8,只能通过配置 "xpack.security.http.ssl.enabled=false" 禁用 ssl,使用标头配置中的基本身份验证(请参阅下面的 config 参数)或(不推荐)通过 xpack.security.enabled=false 禁用安全。

示例

call apoc.es.post("localhost","tweets","users",null,{name:"Chris"})
call apoc.es.put("localhost","tweets","users","1",null,{name:"Chris"})
call apoc.es.get("localhost","tweets","users","1",null,null)
call apoc.es.stats("localhost")
call apoc.es.delete("localhost","indexName","typeName","idName")
apoc.es.get

分页

要使用 Elasticsearch 的分页功能,您需要执行以下步骤

  1. 调用 **apoc.es.query** 获取第一批数据并获取 scroll_id(以便启用分页)。

  2. 对前 N 个匹配项执行合并/创建等操作

  3. 使用 **range(start,end,step)** 函数重复第二个调用以获取所有其他块,直到结束。例如,如果您有 1000 个文档,并且希望每次请求检索 10 个文档,您可以执行 **range(11,1000,10)**。您从 11 开始,因为前 10 个文档已处理。如果您不知道确切的上限(文档的总大小),您可以设置一个大于实际总大小的数字。

  4. 重复的第二个调用是 **apoc.es.get**。请记住将 **scroll_id** 作为参数设置。

  5. 然后像第一个一样处理每个数据块的结果。

这是一个示例

// It's important to create an index to improve performance
CREATE INDEX FOR (n:Document) ON (n.id)
// First query: get first chunk of data + the scroll_id for pagination
CALL apoc.es.query('localhost','test-index','test-type','name:Neo4j&size=1&scroll=5m',null) yield value with value._scroll_id as scrollId, value.hits.hits as hits
// Do something with hits
UNWIND hits as hit
// Here we simply create a document and a relation to a company
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company)
// Then call for the other docs and use the scrollId value from previous query
// Use a range to count our chunk of data (i.e. i want to get chunks from 2 to 10)
WITH range(2,10,1) as list, scrollId
UNWIND list as count
CALL apoc.es.get("localhost","_search","scroll",null,{scroll:"5m",scroll_id:scrollId},null) yield value with value._scoll_id as scrollId, value.hits.hits as nextHits
// Again, do something with hits
UNWIND nextHits as hit
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company) return scrollId, doc, company

此示例在配备 16GB RAM 的 Mac Book Pro 上进行了测试。将 ES 中的 20000 个文档加载到 Neo4j(每次请求 100 个文档)花费了 1 分钟。

通用结构和参数

call apoc.es.post(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value

// GET/PUT/POST url/index/type/id?query -d payload

host-or-key 参数

此参数可以是

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats('http://username:password@host:port')

此外,它可以是 apoc.conf 中要查找的条目

  • 查找 apoc.es.url

  • 查找 apoc.es.host

这优先于直接字符串主机或 URL 作为第一个参数,如上所示。

例如,使用如下所示的 apoc.conf

apoc.es.url=http://username:password@host:port

或像这样

apoc.es.host=username:password@host:port

我们可以通过将 null 作为第一个参数来连接到 Elastic。

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats(null)

此外,它可以是 apoc.conf 中要查找的条目,其中 <key> 已放置在第一个参数中

  • 通过键查找 apoc.es.<key>.url

  • 通过键查找 apoc.es.<key>.host

例如,使用如下所示的 apoc.conf

apoc.es.custom.url=http://username:password@host:port

或像这样

apoc.es.custom.host=username:password@host:port

我们可以通过将 null 作为第一个参数来连接到 Elastic。

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats('custom')

索引参数

主 ES 索引,将直接发送,如果为 null 则为 "_all",多个索引可以用字符串中的逗号分隔。

类型参数

文档类型,将直接发送,如果为 null 则为 "_all",多个类型可以用字符串中的逗号分隔。

ID 参数

文档 ID,在为 null 时将被省略。

查询参数

查询可以是转换为查询字符串的映射、直接字符串或 null,然后将其省略。

有效负载参数

有效负载可以是将转换为 JSON 有效负载的**映射**或将直接发送的字符串或 null。

配置参数

配置可以是一个可选的**映射**,它可以包含以下条目

表 1. 配置参数
名称 类型 默认值 描述

标头

映射

{content-type: "application/json", method, "<httpMethod>"}

包含要添加(或替换)默认标头的标头映射。APOC 需要 method: <httpMethod> 来在后台确定要传递哪个 HTTP 请求方法。也就是说,默认情况下,它在 apoc.es.put 中为 PUT,在 apoc.es.postapoc.es.postRaw 中为 POST,在其他情况下为 GET。

版本

字符串

DEFAULT

可以是 DEFAULTEIGHT,以便根据 Elastic 版本更改 RestAPI 端点。请参阅下面的“端点”表。

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats('custom', { headers: {Authorization: "Basic <Base64Token>"} })

使用基本身份验证并创建以下 HTTP 标头

Authorization: Basic <Base64Token>
method: GET
Content-Type: application/json

Elastic 8 中的一些 API 可以通过过程调用,而无需配置 {version: 'EIGHT'},例如 apoc.es.stats,但对于某些 API,有必要设置它,以正确处理端点,例如 apoc.es.query

表 2. 端点
过程 版本:DEFAULT 版本:EIGHT

apoc.es.stats(host)

<host>/_stats

DEFAULT 相同

apoc.es.query(host, index, type, query, payload, $conf)

<host>/<index param>/<type param>/_stats?<query param>

<host>/<index param>/_stats?<query param>

apoc.es.getRaw/apoc.es.postRaw(host, path, payload, $conf)

<host>/<path param>

DEFAULT 相同

其他 apoc.es.<name>(host, index, type, id, query, payload, $conf) 过程

<host>/<index param>/<type param>/<id param>_stats?<query param> 默认情况下,<index param><id param> 将填充为 _all,而 <id param> 如果不存在,将从端点中删除

<host>/<index param>/<type param>/<id param>_stats?<query param>。请注意,您只需要在 <index param><id param><type param> 之间输入三个值中的一个,其他值最终将从端点中排除。

类型参数通常是一个下划线字符串,指示 API 的类型,例如 _doc_update(而以前指示映射类型)。这是为了允许您调用,例如此 API

例如,通过使用 apoc.es.query,我们可以执行搜索 API

CALL apoc.es.query(<$host>, <$index>, <$type>, 'q=name:Neo4j', null, { version: 'EIGHT' })

通过更新 API更新 Elastic 8 中的文档

CALL apoc.es.put($host,'<indexName>','_doc','<idName>','refresh=true',{name: 'foo'}, {version: 'EIGHT'})

在 Elastic 8 中调用创建索引 API

CALL apoc.es.put($host,'<indexName>', null, null, null, null, { version: 'EIGHT' })

结果

结果是值中映射的流。