ElasticSearch
与 Elastic Search 交互
限定名 | 类型 | 发布版本 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
目前无法通过证书查询 Elastic 8,只能通过配置 |
示例
call apoc.es.post("localhost","tweets","users",null,{name:"Chris"})
call apoc.es.put("localhost","tweets","users","1",null,{name:"Chris"})
call apoc.es.get("localhost","tweets","users","1",null,null)
call apoc.es.stats("localhost")
call apoc.es.delete("localhost","indexName","typeName","idName")

分页
要使用 Elasticsearch 的分页功能,您需要按照以下步骤操作
-
调用 apoc.es.query 获取第一批数据,并获取 scroll_id(以启用分页)。
-
对前 N 个命中项执行 merge/create 等操作
-
使用 range(start,end,step) 函数重复进行第二次调用,以获取所有其他数据块直到结束。例如,如果您有 1000 个文档,并且想每次请求检索 10 个文档,您可以执行 range(11,1000,10)。您从 11 开始,因为前 10 个文档已经被处理。如果您不知道确切的上限(文档的总大小),您可以设置一个大于实际总大小的数字。
-
重复的第二次调用是 apoc.es.get。请记住将 scroll_id 设置为一个参数。
-
然后像处理第一个数据块一样处理每个数据块的结果。
这是一个示例
// It's important to create an index to improve performance
CREATE INDEX FOR (n:Document) ON (n.id)
// First query: get first chunk of data + the scroll_id for pagination
CALL apoc.es.query('localhost','test-index','test-type','name:Neo4j&size=1&scroll=5m',null) yield value with value._scroll_id as scrollId, value.hits.hits as hits
// Do something with hits
UNWIND hits as hit
// Here we simply create a document and a relation to a company
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company)
// Then call for the other docs and use the scrollId value from previous query
// Use a range to count our chunk of data (i.e. i want to get chunks from 2 to 10)
WITH range(2,10,1) as list, scrollId
UNWIND list as count
CALL apoc.es.get("localhost","_search","scroll",null,{scroll:"5m",scroll_id:scrollId},null) yield value with value._scoll_id as scrollId, value.hits.hits as nextHits
// Again, do something with hits
UNWIND nextHits as hit
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company) return scrollId, doc, company
此示例在一台配备 16GB RAM 的 MacBook Pro 上进行了测试。将 20000 个文档从 ES 加载到 Neo4j(每次请求 100 个文档)耗时 1 分钟。
通用结构和参数
call apoc.es.post(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value
// GET/PUT/POST url/index/type/id?query -d payload
host-or-key 参数
参数可以是
-
host
-
host:port
-
username:password@host:port
例如,通过使用 apoc.es.stats,我们可以执行
CALL apoc.es.stats('http://username:password@host:port')
此外,它也可以是 apoc.conf 中要查找的条目
-
lookup apoc.es.url
-
lookup apoc.es.host
这优先于上述直接字符串 host 或 url 作为第一个参数的情况。
例如,使用这样的 apoc.conf
apoc.es.url=http://username:password@host:port
或像这样
apoc.es.host=username:password@host:port
我们可以通过将 null 作为第一个参数连接到 elastic。
例如,通过使用 apoc.es.stats,我们可以执行
CALL apoc.es.stats(null)
此外,它也可以是 apoc.conf 中要查找的条目,其中 <key> 必须放在第一个参数中
-
通过 key 查找 apoc.es.<key>.url
-
通过 key 查找 apoc.es.<key>.host
例如,使用这样的 apoc.conf
apoc.es.custom.url=http://username:password@host:port
或像这样
apoc.es.custom.host=username:password@host:port
我们可以通过将 null 作为第一个参数连接到 elastic。
例如,通过使用 apoc.es.stats,我们可以执行
CALL apoc.es.stats('custom')
config 参数
Config 是一个可选的 map,可以包含以下条目
名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
headers |
|
{ |
包含一个头部 map,用于添加(或替换)默认头部。APOC 需要 |
version |
|
|
可以是 |
例如,通过使用 apoc.es.stats,我们可以执行
CALL apoc.es.stats('custom', { headers: {Authorization: "Basic <Base64Token>"} })
使用 基本认证 并创建以下 HTTP 头部
Authorization: Basic <Base64Token>
method: GET
Content-Type: application/json
Elastic 8 中的某些 API 可以通过过程调用,而无需配置 {version: 'EIGHT'}
,例如 apoc.es.stats
;但对于某些 API,为了正确处理端点,需要设置它,例如 apoc.es.query
。
过程 | version 为 DEFAULT |
version 为 EIGHT |
---|---|---|
|
<host>/_stats |
与 DEFAULT 相同 |
|
<host>/<index param>/<type param>/_stats?<query param> |
<host>/<index param>/_stats?<query param> |
|
|
与 DEFAULT 相同 |
其他 |
|
type 参数通常是一个以下划线开头的字符串,指示 API 的类型,例如 |
例如,通过使用 apoc.es.query
,我们可以执行 Search API
CALL apoc.es.query(<$host>, <$index>, <$type>, 'q=name:Neo4j', null, { version: 'EIGHT' })
通过 Update API 更新 Elastic 8 中的文档
CALL apoc.es.put($host,'<indexName>','_doc','<idName>','refresh=true',{name: 'foo'}, {version: 'EIGHT'})
在 Elastic 8 中调用 Create Index API
CALL apoc.es.put($host,'<indexName>', null, null, null, null, { version: 'EIGHT' })
互惠排序融合 (RRF)
可以使用 ES 从 Neo4j 执行 RRF。更多详细信息请阅读 官方文档。请注意,此 API 从 Elastic 8.14.x 版本开始支持。
这里有一个使用 Neo4j 与 ES 的示例。
步骤 1 - 创建映射
CALL apoc.es.put($host, 'example-index', null, null, null,
{
"mappings": {
"properties": {
"text": {
"type": "text"
},
"vector": {
"type": "dense_vector",
"dims": 1,
"index": true,
"similarity": "l2_norm"
},
"integer": {
"type": "integer"
}
}
}
}, $config)
步骤 2 - 放入文档
CALL apoc.es.put($host, 'example-index/_doc/1', null, null, null,
{
"text" : "rrf",
"vector" : [5],
"integer": 1
}, $config)
CALL apoc.es.put($host, 'example-index/_doc/2', null, null, null,
{
"text" : "rrf rrf",
"vector" : [4],
"integer": 2
}, $config)
CALL apoc.es.put($host, 'example-index/_doc/3', null, null, null,
{
"text" : "rrf rrf rrf",
"vector" : [3],
"integer": 1
}, $config)
CALL apoc.es.put($host, 'example-index/_doc/4', null, null, null,
{
"text" : "rrf rrf rrf rrf",
"integer": 2
}, $config)
CALL apoc.es.put($host, 'example-index/_doc/5', null, null, null,
{
"vector" : [0],
"integer": 1
}, $config)
步骤 4 - 使用 RRF 检索器执行搜索
CALL apoc.es.getRaw($host,'example-index/_search',
{
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"term": {
"text": "rrf"
}
}
}
},
{
"knn": {
"field": "vector",
"query_vector": [3],
"k": 5,
"num_candidates": 5
}
}
],
"window_size": 5,
"rank_constant": 1
}
},
"size": 3,
"aggs": {
"int_count": {
"terms": {
"field": "integer"
}
}
}
}
,$config) yield value