运行非阻塞异步查询
中的示例查询数据库使用async/await
语法,这会强制驱动程序同步工作。当使用await
与查询一起使用时,您的应用程序会等待服务器检索所有查询结果并将它们传输到驱动程序。对于大多数用例来说,这不是问题,但对于处理时间长或结果集大的查询,异步处理可能会加快应用程序的速度。
有几种运行异步查询的方法
-
异步迭代——查询结果在您的应用程序能够处理的速度下(迭代地)处理。驱动程序相应地调节服务器传输的记录数量。
-
Promise API——查询结果作为
Promise
返回。只有当完整的结果集可用于驱动程序时,才会解析此 Promise。最适合服务器处理时间较长但您希望一次处理其结果的查询。您的应用程序会批量接收结果,以供急切使用。 -
流式 API——查询结果作为流返回,以便在每个结果记录可用时对其进行处理。最适合记录处理是独立的查询。您的应用程序会分批接收结果,以供延迟使用。
-
反应式 API——适用于反应式应用程序。
当在事务函数中使用await tx.run() 时,您可以按原样将查询结果返回到事务函数之外以进行进一步处理。另一方面,对于异步查询,您必须在事务函数内部处理结果(Promise API 除外)。 |
异步迭代
该Result
对象支持异步迭代。这允许您的应用程序按自己的速度处理数据,驱动程序相应地调节从服务器流式传输记录的速度,并应用背压。使用异步迭代器,您可以保证您的应用程序不会比其处理速度更快地接收数据。
const session = driver.session()
try {
const peopleNames = await session.executeWrite(async tx => {
const result = tx.run( (1)
'MERGE (p:Person {name: $name}) RETURN p.name AS name',
{ name: 'Alice' }
)
let names = []
for await (const record of result) { (2)
console.log(`Processing ${record.get('name')}`)
names.push(record.get('name'))
}
return names (3)
})
} finally {
await session.close()
}
1 | 运行查询 |
2 | 使用异步迭代处理记录 |
3 | 返回处理后的结果(而不是原始查询结果) |
异步迭代器的使用有两个要点
-
您每个查询结果只能异步迭代一次。一旦结果游标到达流的末尾,它就不会倒带,因此您不能多次迭代结果。如果您需要在应用程序中多次处理数据,则必须将其手动存储在辅助数据结构(如上面的列表)中。
-
结果的处理发生在事务函数内部。您不应将原始结果返回到事务函数之外,然后对其进行迭代。该工作流仅适用于Promise API。
Promise API
Promise API 允许运行查询并将结果作为Promise
接收。您可以将此查询方法视为允许您指定 Cypher 查询和许多异步执行的回调,具体取决于查询结果。
const session = driver.session({database: 'neo4j'})
const result = session.executeWrite(async tx => { (1)
return tx.run(
'MERGE (p:Person {name: $name}) RETURN p.name AS name',
{ name: 'Alice' }
)
})
result.then(result => { (2)
result.records.forEach(record => {
console.log(record.get('name'))
})
return result
})
.catch(error => { (3)
console.log(error)
})
.then(() => session.close()) (4)
1 | 运行查询 |
2 | 为成功运行指定回调,将查询结果作为输入 |
3 | 为失败运行指定回调,将驱动程序错误作为输入 |
4 | 指定无论查询结果如何都运行的回调 |
Promise API 也适用于 |
组合多个事务
要在同一事务中运行多个查询,请使用Promise.all()
。它同时运行异步操作,因此您可以同时提交多个查询并等待它们全部完成。
const companyName = 'Neo4j'
const session = driver.session({database: 'neo4j'})
try {
const names = await session.executeRead(async tx => {
const result = await tx.run('MATCH (p:Person) RETURN p.name AS name')
return result.records.map(record => record.get('name'))
})
const relationshipsCreated = await session.executeWrite(tx =>
Promise.all( // group together all Promises
names.map(name =>
tx.run(`
MATCH (emp:Person {name: $personName})
MERGE (com:Company {name: $companyName})
MERGE (emp)-[:WORKS_FOR]->(com)
`, { personName: name, companyName: companyName }
)
.then(result => result.summary.counters.updates().relationshipsCreated)
)
).then(values => values.reduce((a, b) => a + b)) // aggregate results
)
console.log(`Created ${relationshipsCreated} employees relationships.`)
} finally {
await session.close()
}
流式 API
流式 API 允许运行查询并分别接收结果,只要服务器准备好即可。您可以指定一个回调来处理每个记录。此 API 特别适合服务器可能需要不同时间来检索不同记录但您希望在它们可用时立即处理它们的情况。行为类似于异步迭代器;编程风格不同。
const session = driver.session({database: 'neo4j'})
let peopleNames = []
session
.run('MERGE (p:Person {name: $name}) RETURN p.name AS name', { (1)
name: 'Alice'
})
.subscribe({ (2)
onKeys: keys => { (3)
console.log('Result columns are:')
console.log(keys)
},
onNext: record => { (4)
console.log(`Processing ${record.get('name')}`)
peopleNames.push(record.get('name'))
},
onCompleted: () => { (5)
session.close() // returns a Promise
},
onError: error => { (6)
console.log(error)
}
})
1 | 运行查询 |
2 | 将处理程序附加到结果流 |
3 | 该onKeys 回调接收结果列的列表 |
4 | 该onNext 回调在每次收到记录时都会调用 |
5 | 该onCompleted 回调在事务结束后调用 |
6 | 该onError 在发生错误时触发 |
反应式 API
反应式编程的典型特征是,在反应式流中,使用者控制他们从查询中使用记录的速度,而驱动程序则管理从服务器请求记录的速度。反应式 API 建议用于已面向反应式风格的应用程序。
const rxjs = require('rxjs');
const rxSession = driver.rxSession() (1)
const rxResult = await rxSession.executeWrite(tx => {
return tx
.run('MERGE (p:Person {name: $name}) RETURN p.name AS name', { (2)
name: 'Alice'
})
.records() (3)
.pipe( (4)
rxjs.map(record => record.get('name')),
//rxjs.materialize(), // optional, turns outputs into Notifications
rxjs.toArray()
)
})
const people = await rxResult.toPromise()
console.log(people)
1 | 获取反应式会话 |
2 | 运行查询 |
3 | 获取结果记录的可观察对象 |
4 | 反应式处理 |
反应式 API 在驱动程序的精简版中不可用。 |
词汇表
- LTS
-
长期支持版本是保证支持数年的一种版本。Neo4j 4.4 是 LTS,Neo4j 5 也将有一个 LTS 版本。
- Aura
-
Aura是 Neo4j 的完全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Cypher 上的强大过程 (APOC)是一个(许多)函数库,这些函数本身无法轻松地用 Cypher 表示。
- Bolt
-
Bolt是 Neo4j 实例和驱动程序之间交互使用的协议。默认情况下,它侦听端口 7687。
- ACID
-
原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也能保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员将在某个时间点存储数据的最新版本,则该数据库最终一致。
- 因果一致性
-
如果读写查询以相同的顺序由集群中的每个成员看到,则数据库是因果一致的。这比最终一致性更强。
- NULL
-
空标记不是类型,而是值不存在的占位符。有关更多信息,请参阅Cypher → 使用
null
。 - 事务
-
事务是工作单元,要么完全提交,要么在失败时回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或被撤消,以避免从一个帐户中扣除资金但没有添加到另一个帐户中。
- 背压
-
背压是反对数据流动的力量。它确保客户端不会被超出其处理能力的数据淹没。
- 事务函数
-
事务函数是由
executeRead
或executeWrite
调用执行的回调。如果服务器发生故障,驱动程序会自动重新执行回调。 - 驱动程序
-
一个
Driver
对象包含建立与 Neo4j 数据库连接所需的详细信息。