协调并行事务

在使用 Neo4j 集群时,在大多数情况下默认会强制执行因果一致性,这保证了查询能够读取先前查询所做的更改。然而,对于并行运行的多个事务,默认情况下不会发生同样的情况。在这种情况下,您可以使用书签,让一个事务等待另一个事务的结果在集群中传播后再执行其自身的工作。这不是一个强制要求,并且您只应在需要跨不同事务实现因果一致性时才使用书签,因为等待书签可能会对性能产生负面影响。

书签是一个表示数据库某种状态的令牌。通过将一个或多个书签与查询一起传递,服务器将确保查询在所表示的状态建立之前不会被执行。

带有 .executeQuery() 的书签

使用 .executeQuery() 查询数据库时,驱动程序会为您管理书签。在这种情况下,您可以保证后续查询无需采取进一步操作即可读取之前的更改。

await driver.executeQuery('<QUERY 1>')

// subsequent executeQuery calls will be causally chained

await driver.executeQuery('<QUERY 2>') // can read result of <QUERY 1>
await driver.executeQuery('<QUERY 3>') // can read result of <QUERY 2>

要禁用书签管理和因果一致性,请在 .executeQuery() 调用中将 bookmarkManager 选项设置为 null

await driver.executeQuery(
    '<QUERY>',
    {},
    {
      bookmarkManager: null
    }
)

单个会话中的书签

在单个会话中运行的查询会自动进行书签管理,因此您可以相信一个会话内的查询是因果链式的。

let session = driver.session({database: 'neo4j'})
try {
  await session.executeWrite(async tx => {
    await tx.run("<QUERY 1>")
  })
  await session.executeWrite(async tx => {
    await tx.run("<QUERY 2>")  // can read result of QUERY 1
  })
  await session.executeWrite(async tx => {
    await tx.run("<QUERY 3>")  // can read result of QUERY 1, 2
  })
} finally {
  await session.close()
}

跨多个会话的书签

如果您的应用程序使用多个会话,您可能需要确保一个会话已完成其所有事务后,另一个会话才被允许运行其查询。

在下面的示例中,允许 sessionAsessionB 并发运行,而 sessionC 则等待它们的结果传播完成。这保证了 sessionC 想要操作的 Person 节点确实存在。

使用书签协调多个会话
const neo4j = require('neo4j-driver');

(async () => {
  const URI = '<URI to Neo4j database>'
  const USER = '{neo4j-username}'
  const PASSWORD = '{neo4j-password}'
  let driver
  try {
    driver = neo4j.driver(URI, neo4j.auth.basic(USER, PASSWORD))
    await driver.verifyConnectivity()
  } catch(err) {
    console.log(`-- Connection error --\n${err}\n-- Cause --\n${err.cause}`)
    return
  }
  await createFriends(driver)
})()

async function createFriends(driver) {
  let savedBookmarks = []  // To collect the sessions' bookmarks

  // Create the first person and employment relationship.
  const sessionA = driver.session({database: 'neo4j'})
  try {
    await createPerson(sessionA, 'Alice')
    await employPerson(sessionA, 'Alice', 'Wayne Enterprises')
    savedBookmarks.concat(sessionA.lastBookmarks())  (1)
  } finally {
    sessionA.close()
  }

  // Create the second person and employment relationship.
  const sessionB = driver.session({database: 'neo4j'})
  try {
    await createPerson(sessionB, 'Bob')
    await employPerson(sessionB, 'Bob', 'LexCorp')
    savedBookmarks.concat(sessionB.lastBookmarks())  (1)
  } finally {
    sessionB.close()
  }

  // Create (and show) a friendship between the two people created above.
  const sessionC = driver.session({
    database: 'neo4j',
    bookmarks: savedBookmarks  (2)
  })
  try {
    await createFriendship(sessionC, 'Alice', 'Bob')
    await printFriendships(sessionC)
  } finally {
    sessionC.close()
  }
}

// Create a person node.
async function createPerson(session, name) {
  await session.executeWrite(async tx => {
    await tx.run('CREATE (:Person {name: $name})', { name: name })
  })
}

// Create an employment relationship to a pre-existing company node.
// This relies on the person first having been created.
async function employPerson(session, personName, companyName) {
  await session.executeWrite(async tx => {
    await tx.run(`
      MATCH (person:Person {name: $personName})
      MATCH (company:Company {name: $companyName})
      CREATE (person)-[:WORKS_FOR]->(company)`,
      { personName: personName, companyName: companyName }
    )
  })
}

// Create a friendship between two people.
async function createFriendship(session, nameA, nameB) {
  await session.executeWrite(async tx => {
    await tx.run(`
      MATCH (a:Person {name: $nameA})
      MATCH (b:Person {name: $nameB})
      MERGE (a)-[:KNOWS]->(b)
      `, { nameA: nameA, nameB: nameB }
    )
  })
}

// Retrieve and display all friendships.
async function printFriendships(session) {
  const result = await session.executeRead(async tx => {
    return await tx.run('MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name')
  })
  for(record of result.records) {
    console.log(`${record.get('a.name')} knows ${record.get('b.name')}`)
  }
}
1 使用 Session.lastBookmarks() 方法收集并组合来自不同会话的书签。
2 使用它们通过 bookmarks 参数初始化另一个会话。

driver passing bookmarks

使用书签可能会对性能产生负面影响,因为所有查询都被强制等待最新更改在集群中传播。对于简单的用例,请尝试将查询分组到单个事务或单个会话中。

混合使用 .executeQuery() 和会话

为确保部分通过 .executeQuery() 执行、部分通过会话执行的事务之间的因果一致性,您可以在创建会话时使用 bookmarkManager 选项,将其设置为 driver.executeQueryBookmarkManager。由于这是 .executeQuery() 调用的默认书签管理器,这将确保所有工作都在同一个书签管理器下执行,从而实现因果一致性。

await driver.executeQuery('<QUERY 1>')

session = driver.session({
    bookmarkManager: driver.executeQueryBookmarkManager
})
try {
  // every query inside this session will be causally chained
  // (i.e., can read what was written by <QUERY 1>)
  await session.executeWrite(async tx => tx.run('<QUERY 2>'))
} finally {
  await session.close()
}

// subsequent executeQuery calls will be causally chained
// (i.e., can read what was written by <QUERY 2>)
await driver.executeQuery('<QUERY 3>')

术语表

LTS

长期支持版本是保证在数年内获得支持的版本。Neo4j 4.4 是 LTS 版本,Neo4j 5 也将拥有一个 LTS 版本。

Aura

Aura 是 Neo4j 的全托管云服务。它提供免费和付费计划。

Cypher

Cypher 是 Neo4j 的图查询语言,可让您从数据库中检索数据。它类似于 SQL,但专用于图。

APOC

Awesome Procedures On Cypher (APOC) 是一个包含(许多)无法在 Cypher 本身轻松表达的函数的库。

Bolt

Bolt 是 Neo4j 实例和驱动程序之间交互所使用的协议。它默认监听端口 7687。

ACID

原子性、一致性、隔离性和持久性 (ACID) 是保证数据库事务可靠处理的属性。符合 ACID 标准的 DBMS 可确保数据库中的数据在发生故障时仍保持准确和一致。

最终一致性

如果数据库保证所有集群成员在某个时间点都将存储最新版本的数据,则该数据库是最终一致的。

因果一致性

如果读写查询以相同的顺序被集群中的每个成员看到,则数据库是因果一致的。这比最终一致性更强。

NULL

null 标记不是一种类型,而是值缺失的占位符。欲了解更多信息,请参阅Cypher → 使用 null

事务

事务是工作的一个单元,它要么整体提交,要么在失败时回滚。一个例子是银行转账:它涉及多个步骤,但所有步骤都必须成功或被撤销,以避免资金从一个账户中扣除但未添加到另一个账户的情况。

反压

反压是与数据流相反的作用力。它确保客户端不会被超出其处理速度的数据淹没。

© . All rights reserved.