运行自己的事务
当使用 executeQuery()
查询数据库时,驱动程序会自动创建一个事务。事务是工作的单元,要么完全提交,要么在失败时回滚。例如,当使用 MATCH
和 CREATE
按顺序更新数据库时,您可以将多个 Cypher 语句包含在一个查询中,但您不能有多个查询并在它们之间插入一些客户端逻辑。
对于这些更高级的用例,驱动程序提供了函数来完全控制事务生命周期。这些被称为托管事务,您可以将它们视为解包 execute_query()
的流程并能够在更多位置指定其所需行为的一种方式。
创建会话
在运行事务之前,您需要获取一个会话。会话充当驱动程序和服务器之间具体的查询通道,并确保因果一致性得到执行。
会话使用方法Driver.session()
创建。它采用单个(可选)对象参数,其中属性 database
允许指定目标数据库。有关其他参数,请参阅会话配置。
session = driver.session({ database: 'neo4j' })
会话创建是一个轻量级操作,因此可以创建和销毁会话而不会产生重大成本。在完成会话后始终关闭会话。
会话不是线程安全的:您可以在线程之间共享主 Driver
对象,但请确保每个线程都创建自己的会话。
运行托管事务
一个事务可以包含任意数量的查询。由于 Neo4j 符合ACID 规范,事务中的查询要么整体执行,要么根本不执行:您无法让部分事务成功而另一部分失败。使用事务将相关查询组合在一起,这些查询协同工作以实现单个逻辑数据库操作。
托管事务使用方法Session.executeRead()
和Session.executeWrite()
创建,具体取决于您是想检索数据库中的数据还是更改它。这两种方法都采用事务函数回调,该回调负责实际执行查询和处理结果。
Al
开头的人员。let session = driver.session({ database: 'neo4j' }) (1)
try {
let result = await session.executeRead(async tx => { (2)
return await tx.run(` (3)
MATCH (p:Person) WHERE p.name STARTS WITH $filter
RETURN p.name AS name ORDER BY name
`, {filter: 'Al'}
)
})
for(let record in result.records) { (4)
console.log(record.get('name'))
}
console.log(
`The query \`${result.summary.query.text}\`` +
`returned ${result.records.length} nodes.\n`
)
} finally {
session.close()
}
1 | 创建会话。单个会话可以容纳多个查询。完成后请记住关闭它。 |
2 | .executeRead() (或 .executeWrite() )方法是事务的入口点。 |
3 | 使用方法Transaction.run() 运行查询,提供 Cypher 查询和查询参数对象。每个运行的查询都会返回一个Result 对象。 |
4 | 处理结果记录和查询摘要。 |
不要将参数直接硬编码或连接到查询中。出于性能和安全原因,请改用查询参数。
事务函数绝不应直接返回 Result
对象。相反,始终以某种方式处理结果;至少将其转换为列表。在事务函数中,return
语句会导致事务提交,而如果引发异常,则事务会自动回滚。
方法 .executeRead() 和 .executeWrite() 已替换 .readTransaction() 和 .writeTransaction() ,后者在 5.x 版本中已弃用,将在 6.0 版本中删除。 |
const neo4j = require('neo4j-driver');
(async () => {
const URI = '<URI for Neo4j database>'
const USER = '<Username>'
const PASSWORD = '<Password>'
let driver, session
let employeeThreshold = 10
try {
driver = neo4j.driver(URI, neo4j.auth.basic(USER, PASSWORD))
await driver.verifyConnectivity()
} catch(err) {
console.log(`-- Connection error --\n${err}\n-- Cause --\n${err.cause}`)
await driver.close()
return
}
session = driver.session({ database: 'neo4j' })
for(let i=0; i<100; i++) {
const name = `Neo-${i.toString()}`
const orgId = await session.executeWrite(async tx => {
let result, orgInfo
// Create new Person node with given name, if not already existing
await tx.run(`
MERGE (p:Person {name: $name})
RETURN p.name AS name
`, { name: name }
)
// Obtain most recent organization ID and number of people linked to it
result = await tx.run(`
MATCH (o:Organization)
RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employeesN
ORDER BY o.createdDate DESC
LIMIT 1
`)
if(result.records.length > 0) {
orgInfo = result.records[0]
}
if(orgInfo != undefined && orgInfo['employeesN'] == 0) {
throw new Error('Most recent organization is empty.')
// Transaction will roll back -> not even Person is created!
}
// If org does not have too many employees, add this Person to that
if(orgInfo != undefined && orgInfo['employeesN'] < employeeThreshold) {
result = await tx.run(`
MATCH (o:Organization {id: $orgId})
MATCH (p:Person {name: $name})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN $orgId AS id
`, { orgId: orgInfo['id'], name: name }
)
// Otherwise, create a new Organization and link Person to it
} else {
result = await tx.run(`
MATCH (p:Person {name: $name})
CREATE (o:Organization {id: randomuuid(), createdDate: datetime()})
MERGE (p)-[r:WORKS_FOR]->(o)
RETURN o.id AS id
`, { name: name }
)
}
// Return the Organization ID to which the new Person ends up in
return result.records[0].get('id')
})
console.log(`User ${name} added to organization ${orgId}`)
}
await session.close()
await driver.close()
})()
如果事务由于驱动程序认为是瞬态的原因而失败,它会自动重试运行事务函数(并以指数增长的延迟)。因此,事务函数在运行多次时应产生相同的效果(幂等),因为您事先不知道要执行多少次。在实践中,这意味着您不应编辑或依赖全局变量,例如。请注意,尽管事务函数可能会执行多次,但其中的查询始终只会运行一次。
一个会话可以链接多个事务,但在任何给定时间,一个会话中只能有一个事务处于活动状态。这意味着一个查询必须完成才能运行下一个查询,这就是前面所有示例都使用 async/await
语法的原因。要维护多个并发事务,请参阅如何运行异步查询。
运行显式事务
您可以通过使用方法Session.beginTransaction()
手动开始事务来实现对事务的完全控制。您使用方法Transaction.run()
在显式事务中运行查询,就像在事务函数中一样。
let session = driver.session({ database: 'neo4j' })
let transaction = await session.beginTransaction()
// use tx.run() to run queries
// tx.commit() to commit the transaction
// tx.rollback() to rollback the transaction
await transaction.commit()
await session.close()
显式事务可以使用Transaction.commit()
提交或使用Transaction.rollback()
回滚。如果没有采取任何显式操作,驱动程序将在其生命周期结束时自动回滚事务。
显式事务最适用于需要跨多个函数分发同一事务的 Cypher 执行的应用程序,或需要在单个事务中运行多个查询但不需要托管事务提供的自动重试的应用程序。
const neo4j = require('neo4j-driver');
const URI = '<URI for Neo4j database>';
const USER = '<Username>';
const PASSWORD = '<Password>';
(async () => {
try {
driver = neo4j.driver(URI, neo4j.auth.basic(USER, PASSWORD))
await driver.verifyConnectivity()
} catch(err) {
console.log(`-- Connection error --\n${err}\n-- Cause --\n${err.cause}`)
await driver.close()
return
}
let customerId = await createCustomer(driver)
let otherBankId = 42
await transferToOtherBank(driver, customerId, otherBankId, 999)
await driver.close()
})()
async function createCustomer(driver) {
let { records } = await driver.executeQuery(`
MERGE (c:Customer {id: randomUUID()})
RETURN c.id AS id
`, {},
{ database: 'neo4j' }
)
return records[0].get("id")
}
async function transferToOtherBank(driver, customerId, otherBankId, amount) {
const session = driver.session({ database: 'neo4j' })
const tx = await session.beginTransaction()
try {
if(! checkCustomerBalance(tx, customerId, amount))
return
try {
decreaseCustomerBalance(tx, customerId, amount)
await tx.commit()
} catch (error) {
requestInspection(customerId, otherBankId, amount, e)
throw error // roll back
}
await otherBankTransferApi(customerId, otherBankId, amount)
// Now the money has been transferred => can't rollback anymore
// (cannot rollback external services interactions)
} finally {
await session.close()
}
}
async function checkCustomerBalance(tx, customerId, amount) {
result = await tx.run(`
MATCH (c:Customer {id: $id})
RETURN c.balance >= $amount AS sufficient
`, { id: customerId, amount: amount },
{ database: 'neo4j' }
)
return result.records[0].get('sufficient')
}
async function otherBankTransferApi(customerId, otherBankId, amount) {
// make some API call to other bank
}
async function decreaseCustomerBalance(tx, customerId, amount) {
await tx.run(`
MATCH (c:Customer {id: $id})
SET c.balance = c.balance - $amount
`, { id: customerId, amount: amount }
)
}
async function requestInspection(customerId, otherBankId, amount, error) {
// manual cleanup required; log this or similar
console.log('WARNING: transaction rolled back due to exception:')
console.log(error)
}
会话配置
创建会话时,您可以提供类型为SessionConfig
的可选参数来指定会话配置值。
数据库选择
您应始终使用 database
参数显式指定数据库,即使在单数据库实例上也是如此。这允许驱动程序更有效地工作,因为它节省了到服务器的网络往返以解析主数据库。如果未提供数据库,则使用 Neo4j 实例设置中设置的用户的数据库。
const session = driver.session({
database: 'neo4j'
})
通过配置方法指定数据库优先于USE Cypher 子句。如果服务器在集群上运行,则带有 USE 的查询需要启用服务器端路由。查询的执行时间也可能更长,因为它们可能在第一次尝试时无法到达正确的集群成员,并且需要路由到包含请求数据库的一个成员。 |
请求路由
在集群环境中,所有会话都以写入模式打开,将其路由到领导者。您可以通过将 defaultAccessMode
参数显式设置为 neo4j.session.READ
或 neo4j.session.WRITE
来更改此设置。请注意,.executeRead()
和 .executeWrite()
会自动覆盖会话的默认访问模式。
const session = driver.session({
database: 'neo4j',
defaultAccessMode: neo4j.session.READ
})
尽管在读取模式下执行写入查询可能会导致运行时错误,但您不应该依赖此功能进行访问控制。 两种模式之间的区别在于,读取事务被路由到集群的任何节点,而写入事务则被定向到领导者节点。换句话说,无法保证在读取模式下提交的写入查询会被拒绝。 类似的说明也适用于 |
以其他用户身份运行查询(模拟)
您可以使用参数impersonatedUser
以其他用户的安全上下文执行查询,并指定要模拟的用户名称。为了使此功能生效,创建Driver
的用户的账户需要具有相应的权限。模拟用户比创建新的Driver
对象更经济。
const session = driver.session({
database: 'neo4j',
impersonatedUser: 'somebodyElse'
})
模拟用户时,查询在模拟用户的完整安全上下文中运行,而不是已认证的用户(即,主数据库、权限等)。
事务配置
您可以通过向.executeRead()
、.executeWrite()
和.beginTransaction()
提供第二个可选参数(类型为TransactionConfig
)来进一步控制事务。您可以指定
-
事务超时时间(以毫秒为单位)。运行时间过长的事务将被服务器终止。默认值在服务器端设置。最小值为 1 毫秒。
-
附加到事务的元数据对象。这些元数据将记录在服务器的
query.log
中,并且在SHOW TRANSACTIONS YIELD *
Cypher 命令的输出中可见。使用此功能来标记事务。
let session = driver.session({ database: 'neo4j' })
const people_n = await session.executeRead(
async tx => { return await tx.run('MATCH (a:Person) RETURN count(a)') },
{ timeout: 5000, metadata: {'app_name': 'people'} } // TransactionConfig
)
关闭会话
每个连接池都具有有限数量的会话,因此,如果您打开会话而不关闭它们,您的应用程序可能会用完这些会话。因此,在完成会话后始终关闭它们非常重要,以便可以将它们返回到连接池以供以后重用。最好的方法是将会话使用包装在try/finally
块中,并在finally
子句中调用session.close()
。
let session = driver.session({database: 'neo4j'})
try {
// use session to run queries
} finally {
await session.close()
}
术语表
- LTS
-
长期支持版本是指保证支持若干年的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将有一个 LTS 版本。
- Aura
-
Aura 是 Neo4j 的完全托管云服务。它提供免费和付费计划。
- Cypher
-
Cypher 是 Neo4j 的图查询语言,允许您从数据库中检索数据。它类似于 SQL,但适用于图。
- APOC
-
Awesome Procedures On Cypher (APOC) 是一个包含(许多)函数的库,这些函数无法用 Cypher 本身轻松表达。
- Bolt
-
Bolt 是 Neo4j 实例和驱动程序之间交互使用的协议。默认情况下,它监听端口 7687。
- ACID
-
原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 的 DBMS 确保数据库中的数据即使在发生故障时也能保持准确和一致。
- 最终一致性
-
如果数据库保证所有集群成员将在某个时间点存储数据的最新版本,则该数据库最终一致。
- 因果一致性
-
如果读取和写入查询按相同的顺序被集群中的每个成员看到,则该数据库是因果一致的。这比最终一致性更强。
- NULL
-
空标记不是一种类型,而是值不存在的占位符。有关更多信息,请参阅Cypher → 使用
null
。 - 事务
-
事务是一项工作单元,要么完全提交,要么在发生故障时回滚。例如银行转账:它涉及多个步骤,但它们必须全部成功或被撤销,以避免从一个账户中扣除资金但未添加到另一个账户中。
- 背压
-
背压是阻碍数据流动的力量。它确保客户端不会被超出其处理能力的数据所淹没。
- 事务函数
-
事务函数是由
executeRead
或executeWrite
调用执行的回调函数。如果服务器发生故障,驱动程序会自动重新执行回调函数。 - Driver
-
Driver
对象包含建立与 Neo4j 数据库连接所需的详细信息。