ElasticSearch

限定名 类型 发布版本

apoc.es.stats

apoc.es.stats(host-or-key,$config) - 获取 elastic search 统计信息

过程

Apoc Extended

apoc.es.get

apoc.es.get(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 GET 操作

过程

Apoc Extended

apoc.es.query

apoc.es.query(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 SEARCH 操作

过程

Apoc Extended

apoc.es.getRaw

apoc.es.getRaw(host-or-key,path,payload-or-null,$config) yield value - 在 elastic search 上执行原始 GET 操作

过程

Apoc Extended

apoc.es.postRaw

apoc.es.postRaw(host-or-key,path,payload-or-null,$config) yield value - 在 elastic search 上执行原始 POST 操作

过程

Apoc Extended

apoc.es.post

apoc.es.post(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 POST 操作

过程

Apoc Extended

apoc.es.put

apoc.es.put(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - 在 elastic search 上执行 PUT 操作

过程

Apoc Extended

apoc.es.delete

apoc.es.delete(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,$config) yield value - 在 elastic search 上执行 DELETE 操作

过程

Apoc Extended

目前无法通过证书查询 Elastic 8,只能通过配置 "xpack.security.http.ssl.enabled=false" 禁用 SSL,或者通过请求头配置(参见下面的 config parameter)使用基本认证,或者(不推荐)通过 xpack.security.enabled=false 禁用安全性。

示例

call apoc.es.post("localhost","tweets","users",null,{name:"Chris"})
call apoc.es.put("localhost","tweets","users","1",null,{name:"Chris"})
call apoc.es.get("localhost","tweets","users","1",null,null)
call apoc.es.stats("localhost")
call apoc.es.delete("localhost","indexName","typeName","idName")
apoc.es.get

分页

要使用 Elasticsearch 的分页功能,您需要按照以下步骤操作

  1. 调用 apoc.es.query 获取第一批数据,并获取 scroll_id(以启用分页)。

  2. 对前 N 个命中项执行 merge/create 等操作

  3. 使用 range(start,end,step) 函数重复进行第二次调用,以获取所有其他数据块直到结束。例如,如果您有 1000 个文档,并且想每次请求检索 10 个文档,您可以执行 range(11,1000,10)。您从 11 开始,因为前 10 个文档已经被处理。如果您不知道确切的上限(文档的总大小),您可以设置一个大于实际总大小的数字。

  4. 重复的第二次调用是 apoc.es.get。请记住将 scroll_id 设置为一个参数。

  5. 然后像处理第一个数据块一样处理每个数据块的结果。

这是一个示例

// It's important to create an index to improve performance
CREATE INDEX FOR (n:Document) ON (n.id)
// First query: get first chunk of data + the scroll_id for pagination
CALL apoc.es.query('localhost','test-index','test-type','name:Neo4j&size=1&scroll=5m',null) yield value with value._scroll_id as scrollId, value.hits.hits as hits
// Do something with hits
UNWIND hits as hit
// Here we simply create a document and a relation to a company
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company)
// Then call for the other docs and use the scrollId value from previous query
// Use a range to count our chunk of data (i.e. i want to get chunks from 2 to 10)
WITH range(2,10,1) as list, scrollId
UNWIND list as count
CALL apoc.es.get("localhost","_search","scroll",null,{scroll:"5m",scroll_id:scrollId},null) yield value with value._scoll_id as scrollId, value.hits.hits as nextHits
// Again, do something with hits
UNWIND nextHits as hit
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company) return scrollId, doc, company

此示例在一台配备 16GB RAM 的 MacBook Pro 上进行了测试。将 20000 个文档从 ES 加载到 Neo4j(每次请求 100 个文档)耗时 1 分钟。

通用结构和参数

call apoc.es.post(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value

// GET/PUT/POST url/index/type/id?query -d payload

host-or-key 参数

参数可以是

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats('http://username:password@host:port')

此外,它也可以是 apoc.conf 中要查找的条目

  • lookup apoc.es.url

  • lookup apoc.es.host

这优先于上述直接字符串 host 或 url 作为第一个参数的情况。

例如,使用这样的 apoc.conf

apoc.es.url=http://username:password@host:port

或像这样

apoc.es.host=username:password@host:port

我们可以通过将 null 作为第一个参数连接到 elastic。

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats(null)

此外,它也可以是 apoc.conf 中要查找的条目,其中 <key> 必须放在第一个参数中

  • 通过 key 查找 apoc.es.<key>.url

  • 通过 key 查找 apoc.es.<key>.host

例如,使用这样的 apoc.conf

apoc.es.custom.url=http://username:password@host:port

或像这样

apoc.es.custom.host=username:password@host:port

我们可以通过将 null 作为第一个参数连接到 elastic。

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats('custom')

index 参数

主要的 ES 索引,将直接发送;如果为 null,则为 "_all";多个索引可以在字符串中用逗号分隔。

type 参数

文档类型,将直接发送;如果为 null,则为 "_all";多个类型可以在字符串中用逗号分隔。

id 参数

文档 id,如果为 null 则省略。

query 参数

查询可以是一个 map,它将被转换为查询字符串;也可以是一个直接字符串;如果为 null 则省略。

payload 参数

Payload 可以是一个 map,它将被转换为 json payload;也可以是一个字符串,它将直接发送;如果为 null 则省略。

config 参数

Config 是一个可选的 map,可以包含以下条目

表 1. Config 参数
名称 类型 默认值 描述

headers

Map

{content-type: "application/json", method, "<httpMethod>"}

包含一个头部 map,用于添加(或替换)默认头部。APOC 需要 method: <httpMethod> 来判断内部应传递哪个 HTTP 请求方法。也就是说,默认情况下,对于 apoc.es.put 是 PUT,对于 apoc.es.postapoc.es.postRaw 是 POST,其他情况下是 GET。

version

String

DEFAULT

可以是 DEFAULTEIGHT,以根据 Elastic 版本更改 RestAPI 端点。参见下面的 Endpoint 表。

例如,通过使用 apoc.es.stats,我们可以执行

CALL apoc.es.stats('custom', { headers: {Authorization: "Basic <Base64Token>"} })

使用 基本认证 并创建以下 HTTP 头部

Authorization: Basic <Base64Token>
method: GET
Content-Type: application/json

Elastic 8 中的某些 API 可以通过过程调用,而无需配置 {version: 'EIGHT'},例如 apoc.es.stats;但对于某些 API,为了正确处理端点,需要设置它,例如 apoc.es.query

表 2. 端点
过程 version 为 DEFAULT version 为 EIGHT

apoc.es.stats(host)

<host>/_stats

与 DEFAULT 相同

apoc.es.query(host, index, type, query, payload, $conf)

<host>/<index param>/<type param>/_stats?<query param>

<host>/<index param>/_stats?<query param>

apoc.es.getRaw/apoc.es.postRaw(host, path, payload, $conf)

<host>/<path param>

与 DEFAULT 相同

其他 apoc.es.<name>(host, index, type, id, query, payload, $conf) 过程

<host>/<index param>/<type param>/<id param>_stats?<query param> 默认情况下,<index param><type param> 将被填充为 _all,而 <id param> 如果不存在,将从端点中移除

<host>/<index param>/<type param>/<id param>_stats?<query param>. 注意,您只需要在 <index param><id param><type param> 这三个值中输入一个,其他值最终将被从端点中排除。

type 参数通常是一个以下划线开头的字符串,指示 API 的类型,例如 _doc_update(以前表示 映射类型)。这允许您调用例如此 API

例如,通过使用 apoc.es.query,我们可以执行 Search API

CALL apoc.es.query(<$host>, <$index>, <$type>, 'q=name:Neo4j', null, { version: 'EIGHT' })

通过 Update API 更新 Elastic 8 中的文档

CALL apoc.es.put($host,'<indexName>','_doc','<idName>','refresh=true',{name: 'foo'}, {version: 'EIGHT'})

在 Elastic 8 中调用 Create Index API

CALL apoc.es.put($host,'<indexName>', null, null, null, null, { version: 'EIGHT' })

结果

结果是 value 中的 map 流。

互惠排序融合 (RRF)

可以使用 ES 从 Neo4j 执行 RRF。更多详细信息请阅读 官方文档。请注意,此 API 从 Elastic 8.14.x 版本开始支持。

这里有一个使用 Neo4j 与 ES 的示例。

步骤 1 - 创建映射

CALL apoc.es.put($host, 'example-index', null, null, null,
{
              "mappings": {
                "properties": {
                  "text": {
                    "type": "text"
                  },
                  "vector": {
                    "type": "dense_vector",
                    "dims": 1,
                    "index": true,
                    "similarity": "l2_norm"
                  },
                  "integer": {
                    "type": "integer"
                  }
                }
              }
            }, $config)

结果

结果是 value 中的 map 流。

步骤 2 - 放入文档

CALL apoc.es.put($host, 'example-index/_doc/1', null, null, null,
{
    "text" : "rrf",
    "vector" : [5],
    "integer": 1
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/2', null, null, null,
{
    "text" : "rrf rrf",
    "vector" : [4],
    "integer": 2
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/3', null, null, null,
{
    "text" : "rrf rrf rrf",
    "vector" : [3],
    "integer": 1
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/4', null, null, null,
{
    "text" : "rrf rrf rrf rrf",
    "integer": 2
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/5', null, null, null,
{
    "vector" : [0],
    "integer": 1
}, $config)

结果

结果是 value 中的 map 流。

步骤 3 - 刷新索引

CALL apoc.es.post($host, 'example-index/_refresh', null, null, '', $config)

结果

结果是 value 中的 map 流。

步骤 4 - 使用 RRF 检索器执行搜索

CALL apoc.es.getRaw($host,'example-index/_search',
{
    "retriever": {
        "rrf": {
            "retrievers": [
                {
                    "standard": {
                        "query": {
                            "term": {
                                "text": "rrf"
                            }
                        }
                    }
                },
                {
                    "knn": {
                        "field": "vector",
                        "query_vector": [3],
                        "k": 5,
                        "num_candidates": 5
                    }
                }
            ],
            "window_size": 5,
            "rank_constant": 1
        }
    },
    "size": 3,
    "aggs": {
        "int_count": {
            "terms": {
                "field": "integer"
            }
        }
    }
}
,$config) yield value

结果

结果是 value 中的 map 流。

© . All rights reserved.