运行自己的事务
当 使用 executeQuery()
查询数据库时,驱动程序会自动创建一个*事务*。事务是一个工作单元,要么全部*提交*,要么在失败时*回滚*。您可以在单个查询中包含多个 Cypher 语句,例如在使用 MATCH
和 CREATE
顺序更新数据库时,但您不能有多个查询并在它们之间穿插一些客户端逻辑。
对于这些更高级的用例,驱动程序提供了函数来完全控制事务生命周期。这些被称为*托管事务*,您可以将它们视为一种解构 execute_query()
流程的方式,并能够在更多地方指定其所需的行为。
创建会话
在运行事务之前,您需要获取一个*会话*。会话充当驱动程序和服务器之间具体的查询通道,并确保强制执行因果一致性。
会话通过方法 Driver.session()
创建。它接受一个单个(可选)对象参数,其中 database
属性允许指定目标数据库。有关更多参数,请参阅会话配置。
session = driver.session({ database: 'neo4j' })
会话创建是一个轻量级操作,因此会话的创建和销毁不会产生显著成本。使用完毕后务必关闭会话。
会话*不是*线程安全的:您可以在线程之间共享主 Driver
对象,但请确保每个线程创建自己的会话。
运行托管事务
一个事务可以包含任意数量的查询。由于 Neo4j 符合 ACID,事务中的查询要么全部执行,要么根本不执行:您无法让事务的一部分成功而另一部分失败。使用事务将相关查询分组在一起,它们协同工作以实现单个*逻辑*数据库操作。
托管事务使用方法 Session.executeRead()
和 Session.executeWrite()
创建,具体取决于您是想从数据库中*检索*数据还是*修改*数据。这两种方法都接受一个事务函数回调,该回调负责实际执行查询并处理结果。
Al
开头的人。let session = driver.session({ database: 'neo4j' }) (1)
try {
let result = await session.executeRead(async tx => { (2)
return await tx.run(` (3)
MATCH (p:Person) WHERE p.name STARTS WITH $filter
RETURN p.name AS name ORDER BY name
`, {filter: 'Al'}
)
})
for(let record in result.records) { (4)
console.log(record.get('name'))
}
console.log(
`The query \`${result.summary.query.text}\`` +
`returned ${result.records.length} nodes.\n`
)
} finally {
session.close()
}
1 | 创建一个会话。单个会话可以包含多个查询。完成后请记住关闭它。 |
2 | .executeRead() (或 .executeWrite() )方法是事务的入口点。 |
3 | 使用方法 Transaction.run() 运行查询,提供 Cypher 查询和一个查询参数对象。每个运行的查询都返回一个 Result 对象。 |
4 | 处理结果记录和查询摘要。 |
不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数。
事务函数不应直接返回 Result
对象。相反,总是以某种方式处理结果;至少将其转换为列表。在事务函数内部,return
语句会导致事务被提交,而如果引发异常,事务则会自动回滚。
方法 .executeRead() 和 .executeWrite() 已取代了 .readTransaction() 和 .writeTransaction() ,后者在 5.x 版本中已弃用,并将在 6.0 版本中移除。 |
const neo4j = require('neo4j-driver');
(async () => {
const URI = '{neo4j-database-uri}'
const USER = '{neo4j-username}'
const PASSWORD = '{neo4j-password}'
let driver, session
let employeeThreshold = 10
try {
driver = neo4j.driver(URI, neo4j.auth.basic(USER, PASSWORD))
await driver.verifyConnectivity()
} catch(err) {
console.log(`-- Connection error --\n${err}\n-- Cause --\n${err.cause}`)
await driver.close()
return
}
session = driver.session({ database: 'neo4j' })
for(let i=0; i<100; i++) {
const name = `Neo-${i.toString()}`
const orgId = await session.executeWrite(async tx => {
let result, orgInfo
// Create new Person node with given name, if not already existing
await tx.run(`
MERGE (p:Person {name: $name})
RETURN p.name AS name
`, { name: name }
)
// Obtain most recent organization ID and number of people linked to it
result = await tx.run(`
MATCH (o:Organization)
RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
ORDER BY o.createdDate DESC
LIMIT 1
`)
if(result.records.length > 0) {
orgInfo = result.records[0]
}
if(orgInfo != undefined && orgInfo['employeesN'] == 0) {
throw new Error('Most recent organization is empty.')
// Transaction will roll back -> not even Person is created!
}
// If org does not have too many employees, add this Person to that
if(orgInfo != undefined && orgInfo['employeesN'] < employeeThreshold) {
result = await tx.run(`
MATCH (o:Organization {id: $orgId})
MATCH (p:Person {name: $name})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN $orgId AS id
`, { orgId: orgInfo['id'], name: name }
)
// Otherwise, create a new Organization and link Person to it
} else {
result = await tx.run(`
MATCH (p:Person {name: $name})
CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN o.id AS id
`, { name: name }
)
}
// Return the Organization ID to which the new Person ends up in
return result.records[0].get('id')
})
console.log(`User ${name} added to organization ${orgId}`)
}
await session.close()
await driver.close()
})()
如果事务因驱动程序认为的临时原因而失败,它会自动重试运行事务函数(具有指数级增长的延迟)。因此,事务函数在多次运行时应产生相同的效果(*幂等*),因为您无法预先知道它们将执行多少次。实际上,这意味着您不应编辑或依赖全局变量等。请注意,尽管事务函数可能会执行多次,但其中的查询始终只会运行一次。
一个会话可以链式执行多个事务,但在任何给定时间,会话内只能有一个事务处于活动状态。这意味着一个查询必须完成后才能运行下一个查询,这也是前面所有示例都使用 async/await
语法的原因。要维护多个并发事务,请参阅如何运行异步查询。
运行显式事务
您可以通过使用方法 Session.beginTransaction()
手动开始一个事务来完全控制事务。您使用方法 Transaction.run()
在显式事务中运行查询,就像在事务函数中一样。
let session = driver.session({ database: 'neo4j' })
let transaction = await session.beginTransaction()
// use tx.run() to run queries
// tx.commit() to commit the transaction
// tx.rollback() to rollback the transaction
await transaction.commit()
await session.close()
显式事务可以使用 Transaction.commit()
提交或使用 Transaction.rollback()
回滚。如果未采取任何显式操作,驱动程序将在事务生命周期结束时自动回滚事务。
显式事务最适用于需要在同一事务中将 Cypher 执行分布到多个函数中的应用程序,或需要在单个事务中运行多个查询但不需要托管事务提供的自动重试功能的应用程序。
const neo4j = require('neo4j-driver');
const URI = '{neo4j-database-uri}';
const USER = '{neo4j-username}';
const PASSWORD = '{neo4j-password}';
(async () => {
try {
driver = neo4j.driver(URI, neo4j.auth.basic(USER, PASSWORD))
await driver.verifyConnectivity()
} catch(err) {
console.log(`-- Connection error --\n${err}\n-- Cause --\n${err.cause}`)
await driver.close()
return
}
let customerId = await createCustomer(driver)
let otherBankId = 42
await transferToOtherBank(driver, customerId, otherBankId, 999)
await driver.close()
})()
async function createCustomer(driver) {
let { records } = await driver.executeQuery(`
MERGE (c:Customer {id: randomUUID()})
RETURN c.id AS id
`, {},
{ database: 'neo4j' }
)
return records[0].get("id")
}
async function transferToOtherBank(driver, customerId, otherBankId, amount) {
const session = driver.session({ database: 'neo4j' })
const tx = await session.beginTransaction()
try {
if(! checkCustomerBalance(tx, customerId, amount))
return
try {
decreaseCustomerBalance(tx, customerId, amount)
await tx.commit()
} catch (error) {
requestInspection(customerId, otherBankId, amount, e)
throw error // roll back
}
await otherBankTransferApi(customerId, otherBankId, amount)
// Now the money has been transferred => can't rollback anymore
// (cannot rollback external services interactions)
} finally {
await session.close()
}
}
async function checkCustomerBalance(tx, customerId, amount) {
result = await tx.run(`
MATCH (c:Customer {id: $id})
RETURN c.balance >= $amount AS sufficient
`, { id: customerId, amount: amount },
{ database: 'neo4j' }
)
return result.records[0].get('sufficient')
}
async function otherBankTransferApi(customerId, otherBankId, amount) {
// make some API call to other bank
}
async function decreaseCustomerBalance(tx, customerId, amount) {
await tx.run(`
MATCH (c:Customer {id: $id})
SET c.balance = c.balance - $amount
`, { id: customerId, amount: amount }
)
}
async function requestInspection(customerId, otherBankId, amount, error) {
// manual cleanup required; log this or similar
console.log('WARNING: transaction rolled back due to exception:')
console.log(error)
}
会话配置
创建会话时,您可以提供一个可选的 SessionConfig
类型参数来指定会话配置值。
数据库选择
您应该始终使用 database
参数显式指定数据库,即使在单数据库实例上也是如此。这使得驱动程序能够更高效地工作,因为它节省了一次到服务器解析主数据库的网络往返。如果没有给定数据库,则使用 Neo4j 实例设置中配置的用户主数据库。
const session = driver.session({
database: 'neo4j'
})
通过配置方法指定数据库优于使用 USE Cypher 子句。如果服务器在集群上运行,带有 USE 的查询需要启用服务器端路由。查询的执行时间也可能更长,因为它们可能无法在第一次尝试时到达正确的集群成员,并且需要路由到包含所请求数据库的成员。 |
请求路由
在集群环境中,所有会话都以写入模式打开,并路由到主节点。您可以通过将 defaultAccessMode
参数显式设置为 neo4j.session.READ
或 neo4j.session.WRITE
来更改此设置。请注意,.executeRead()
和 .executeWrite()
会自动覆盖会话的默认访问模式。
const session = driver.session({
database: 'neo4j',
defaultAccessMode: neo4j.session.READ
})
尽管在读取模式下执行*写入*查询可能会导致运行时错误,但您不应依赖此进行访问控制。两种模式之间的区别在于,*读取*事务会被路由到集群的任何节点,而*写入*事务则会定向到主节点。换句话说,无法保证在读取模式下提交的写入查询会被拒绝。
|
以不同用户身份运行查询
您可以使用配置参数 auth
通过不同的用户执行查询。在会话级别切换用户比创建新的 Driver
对象成本更低。然后,查询将在给定用户的安全上下文中运行(即,主数据库、权限等)。
会话范围认证需要服务器版本 >= 5.8。
const session = driver.session({
database: 'neo4j',
auth: neo4j.auth.basic('somebodyElse', 'theirPassword')
})
参数 impersonatedUser
提供类似的功能,并且在驱动程序/服务器版本 >= 4.4 中可用。区别在于您不需要知道用户的密码即可模拟他们,但创建 Driver
的用户需要具有适当的权限。
const session = driver.session({
database: 'neo4j',
impersonatedUser: 'somebodyElse'
})
事务配置
您可以通过为 .executeRead()
、.executeWrite()
和 .beginTransaction()
提供第二个可选的 TransactionConfig
类型参数来进一步控制事务。您可以指定
-
事务超时(毫秒)。运行时间过长的事务将被服务器终止。默认值在服务器端设置。最小值为一毫秒。
-
附加到事务的元数据对象。这些元数据会记录在服务器的
query.log
中,并在SHOW TRANSACTIONS YIELD *
Cypher 命令的输出中可见。使用此功能标记事务。
let session = driver.session({ database: 'neo4j' })
const people_n = await session.executeRead(
async tx => { return await tx.run('MATCH (a:Person) RETURN count(a)') },
{ timeout: 5000, metadata: {'app_name': 'people'} } // TransactionConfig
)
关闭会话
每个连接池都拥有有限数量的会话,因此如果您打开会话而不关闭它们,您的应用程序可能会耗尽会话。因此,在使用完毕后始终关闭会话非常重要,以便它们可以返回到连接池以供后续重用。最佳做法是将会话使用封装在 try/finally
块中,并在 finally
子句中调用 session.close()
。
let session = driver.session({database: 'neo4j'})
try {
// use session to run queries
} finally {
await session.close()
}
词汇表
- LTS
-
长期支持版本是保证支持多年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。
- Aura
-
Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Cypher 上的优秀过程 (APOC) 是一个(包含许多)无法在 Cypher 本身中轻松表达的函数库。
- Bolt
-
Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。
- ACID
-
原子性、一致性、隔离性、持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员*在某个时间点*都会存储最新版本的数据,则该数据库是最终一致的。
- 因果一致性
-
如果读写查询在集群的每个成员中以相同的顺序可见,则数据库是因果一致的。这比*最终一致性*更强。
- NULL
-
null 标记不是一种类型,而是表示值缺失的占位符。有关更多信息,请参阅 Cypher → 使用
null
。 - 事务
-
事务是一个工作单元,要么全部*提交*,要么在失败时*回滚*。一个例子是银行转账:它涉及多个步骤,但它们必须*全部*成功或被撤销,以避免资金从一个账户中扣除而没有添加到另一个账户。
- 反压
-
反压是阻碍数据流动的力。它确保客户端不会被超出其处理能力的数据量所淹没。
- 事务函数
-
事务函数是
executeRead
或executeWrite
调用执行的回调。在服务器故障时,驱动程序会自动重新执行该回调。 - 驱动程序
-
Driver
对象包含与 Neo4j 数据库建立连接所需的详细信息。