运行非阻塞异步查询
在查询数据库中的示例使用了async/await
语法,这强制驱动程序同步工作。当对查询使用await
时,您的应用程序会等待服务器检索所有查询结果并将其传输给驱动程序。这对于大多数用例来说不是问题,但对于处理时间长或结果集大的查询,异步处理可能会加快您的应用程序。
有几种运行异步查询的方式
-
异步迭代 — 查询结果会以应用程序能够处理的速度(迭代地)进行处理。驱动程序会相应地调整服务器传输的记录数量。
-
Promise API — 查询结果以
Promise
形式返回。Promise 仅在驱动程序获得完整结果集时才会被解析。最适合服务器处理时间长但希望一次性处理其结果的查询。您的应用程序会批量接收结果,以便立即使用。 -
流式 API — 查询结果以流的形式返回,以便每个结果记录在可用时立即处理。最适合记录处理是单个进行的查询。您的应用程序会零散地接收结果,以便按需使用。
-
响应式 API — 适用于响应式应用程序。
当在事务函数中使用await tx.run() 时,您可以将查询结果原样从事务函数中返回以供进一步处理。另一方面,对于异步查询,您必须在事务函数内部处理结果(Promise API 除外)。 |
异步迭代
Result
对象支持异步迭代。这允许您的应用程序按自己的速度处理数据,驱动程序会相应地调整从服务器流式传输记录的速度,并施加背压。使用异步迭代器,您可以保证您的应用程序不会以超过其处理能力的速度接收数据。
const session = driver.session()
try {
const peopleNames = await session.executeWrite(async tx => {
const result = tx.run( (1)
'MERGE (p:Person {name: $name}) RETURN p.name AS name',
{ name: 'Alice' }
)
let names = []
for await (const record of result) { (2)
console.log(`Processing ${record.get('name')}`)
names.push(record.get('name'))
}
return names (3)
})
} finally {
await session.close()
}
1 | 运行查询 |
2 | 使用异步迭代处理记录 |
3 | 返回已处理的结果(而非原始查询结果) |
异步迭代器的使用有两点重要事项
-
每个查询结果只能异步迭代一次。一旦结果游标到达流的末尾,它就不会倒回,因此您不能多次迭代结果。如果您的应用程序需要多次处理数据,您必须手动将其存储在辅助数据结构中(如上所示的列表)。
-
结果的处理发生在事务函数内部。您不应该将原始结果从事务函数中返回,然后对其进行迭代。该工作流程仅适用于Promise API。
Promise API
Promise API 允许运行查询并以 Promise
形式接收结果。您可以将此查询方法视为允许您指定一个 Cypher 查询以及多个根据查询结果异步执行的回调。
const session = driver.session({database: 'neo4j'})
const result = session.executeWrite(async tx => { (1)
return tx.run(
'MERGE (p:Person {name: $name}) RETURN p.name AS name',
{ name: 'Alice' }
)
})
result.then(result => { (2)
result.records.forEach(record => {
console.log(record.get('name'))
})
return result
})
.catch(error => { (3)
console.log(error)
})
.then(() => session.close()) (4)
1 | 运行查询 |
2 | 指定成功运行的回调,以查询结果作为输入 |
3 | 指定失败运行的回调,以驱动程序错误作为输入 |
4 | 指定无论查询结果如何都运行的回调 |
Promise API 也适用于 |
组合多个事务
要在同一事务中运行多个查询,请使用Promise.all()
。它并发运行异步操作,因此您可以同时提交多个查询并等待它们全部完成。
const companyName = 'Neo4j'
const session = driver.session({database: 'neo4j'})
try {
const names = await session.executeRead(async tx => {
const result = await tx.run('MATCH (p:Person) RETURN p.name AS name')
return result.records.map(record => record.get('name'))
})
const relationshipsCreated = await session.executeWrite(tx =>
Promise.all( // group together all Promises
names.map(name =>
tx.run(`
MATCH (emp:Person {name: $personName})
MERGE (com:Company {name: $companyName})
MERGE (emp)-[:WORKS_FOR]->(com)
`, { personName: name, companyName: companyName }
)
.then(result => result.summary.counters.updates().relationshipsCreated)
)
).then(values => values.reduce((a, b) => a + b)) // aggregate results
)
console.log(`Created ${relationshipsCreated} employees relationships.`)
} finally {
await session.close()
}
流式 API
流式 API 允许运行查询并在服务器准备好结果时立即单独接收它们。您可以指定一个回调来处理每条记录。此 API 特别适用于服务器检索不同记录可能需要不同时间,但您希望在它们可用时立即处理每个记录的情况。其行为类似于异步迭代器;编程风格不同。
const session = driver.session({database: 'neo4j'})
let peopleNames = []
session
.run('MERGE (p:Person {name: $name}) RETURN p.name AS name', { (1)
name: 'Alice'
})
.subscribe({ (2)
onKeys: keys => { (3)
console.log('Result columns are:')
console.log(keys)
},
onNext: record => { (4)
console.log(`Processing ${record.get('name')}`)
peopleNames.push(record.get('name'))
},
onCompleted: () => { (5)
session.close() // returns a Promise
},
onError: error => { (6)
console.log(error)
}
})
1 | 运行查询 |
2 | 将处理程序附加到结果流 |
3 | onKeys 回调接收结果列列表 |
4 | 每次接收到记录时都会调用onNext 回调 |
5 | 事务结束时调用onCompleted 回调 |
6 | 发生错误时触发onError |
响应式 API
响应式编程的典型特点是,在响应式流中,消费者控制从查询中消耗记录的速率,而驱动程序则管理从服务器请求记录的速率。响应式 API 推荐用于已面向响应式风格的应用程序。
const rxjs = require('rxjs');
const rxSession = driver.rxSession() (1)
const rxResult = await rxSession.executeWrite(tx => {
return tx
.run('MERGE (p:Person {name: $name}) RETURN p.name AS name', { (2)
name: 'Alice'
})
.records() (3)
.pipe( (4)
rxjs.map(record => record.get('name')),
//rxjs.materialize(), // optional, turns outputs into Notifications
rxjs.toArray()
)
})
const people = await rxResult.toPromise()
console.log(people)
1 | 获取响应式会话 |
2 | 运行查询 |
3 | 获取结果记录的 observable |
4 | 响应式处理 |
响应式 API 在驱动程序的精简版中不可用。 |
术语表
- LTS
-
长期支持版本是保证在数年内获得支持的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含(许多)无法在 Cypher 本身中轻松表达的函数的库。
- Bolt
-
Bolt 是 Neo4j 实例和驱动程序之间交互使用的协议。它默认在端口 7687 上监听。
- ACID
-
原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation)、持久性 (Durability)(ACID)是保证数据库事务可靠处理的特性。符合 ACID 标准的 DBMS 可确保数据库中的数据即使发生故障也能保持准确和一致。
- 最终一致性
-
如果一个数据库保证所有集群成员在某个时间点都会存储最新版本的数据,则该数据库是最终一致的。
- 因果一致性
-
如果集群的每个成员都以相同的顺序看到读写查询,则该数据库是因果一致的。这比最终一致性更强。
- NULL
-
null 标记不是一种类型,而是值缺失的占位符。欲了解更多信息,请参阅Cypher → 处理
null
。 - 事务
-
事务是一个工作单元,它要么被完整地提交,要么在失败时被回滚。一个例子是银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免钱从一个账户扣除但未添加到另一个账户。
- 背压
-
背压是阻碍数据流动的力。它确保客户端不会被数据以超过其处理能力的速度所淹没。
- 事务函数
-
事务函数是由
executeRead
或executeWrite
调用执行的回调。如果服务器出现故障,驱动程序会自动重新执行该回调。 - 驱动程序
-
一个
Driver
对象包含建立与 Neo4j 数据库连接所需的详细信息。