数据类型映射

Neo4j 和 Cypher® 提供了一个 类型系统,用于描述值如何在数据库中存储,但这些类型并不总是完全匹配 Spark 提供的类型。

在某些情况下,Neo4j 提供了一些 Spark 没有等效类型的数据类型,反之亦然。

数据类型映射

表 1. Spark 到 Neo4j 类型映射参考
Neo4j 类型 Spark 类型 说明

String

string

例如:"Hello"

Integer

long

例如:12345

Float

double

例如:3.141592

Boolean

boolean

例如:true

Point

struct { type: string, srid: integer, x: double, y: double, z: double }

有关 Neo4j 中空间类型的更多信息,请参阅 空间值

Date

date

例如:2020-09-11

Time

struct { type: string, value: string }

例如:[offset-time, 12:14:08.209Z]

LocalTime

struct { type: string, value: string }

例如:[local-time, 12:18:11.628]

DateTime

timestamp

例如:2020-09-11 12:17:39.192 此映射仅适用于从 Neo4j 读取;否则,将 写入 Neo4j 的 timestampLocalDateTime。如果您需要写入 DateTime,请使用 Cypher 写入查询

LocalDateTime

timestamp

例如:2020-09-11 12:14:49.081

Duration

struct { type: string, months: long, days: long, seconds: long, nanonseconds: integer, value: string }

参阅 时间函数:持续时间

Node

struct { <id>: long, <labels>: array[string], (PROPERTIES) }

Neo4j 中的节点表示为属性容器;也就是说,它们显示为结构体,其属性对应于节点中的任何属性。为了便于使用,通常最好从查询中返回单个属性而不是节点。

Relationship

struct { <rel.id>: long, <rel.type>: string, <source.id>: long, <target.id>: long, (PROPERTIES) }

关系以映射形式返回,标识关系的源和目标、关系类型以及关系的属性(如果有)。为了便于使用,通常最好从查询中返回单个属性而不是关系。

Path

string

例如:path[(322)←[20280:AIRLINE]-(33510)]为了便于使用,建议使用 路径函数 从查询中返回路径的单个属性/方面。

[相同类型的数组]

array[element]

在 Neo4j 中,数组必须具有一致的类型(例如,数组只能包含 Float 值)。内部 Spark 类型与上面的类型映射匹配。

复杂数据类型

Spark 本身不支持所有 Neo4j 数据类型(例如 PointTimeDuration)。此类类型将转换为包含所有有用数据的 Struct 类型。

表 2. 复杂数据类型转换
Neo4j 类型 Spark Struct

Duration

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("months", DataTypes.LongType, false),
    ("days", DataTypes.LongType, false),
    ("seconds", DataTypes.LongType, false),
    ("nanoseconds", DataTypes.IntegerType, false),
    ("value", DataTypes.StringType, false)
  ))

Point

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("srid", DataTypes.IntegerType, false),
    ("x", DataTypes.DoubleType, false),
    ("y", DataTypes.DoubleType, false),
    ("z", DataTypes.DoubleType, true),
  ))

Time

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("value", DataTypes.StringType, false)
  ))

映射类型

当列是映射时,连接器会尝试将其扁平化。例如,考虑以下数据集

id name lives_in

1

Andrea Santurbano

{address: 'Times Square, 1', city: 'NY', state: 'NY'}

2

Davide Fantuzzi

{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}

连接器将 lives_in 列扁平化为三个列 lives_in.addresslives_in.citylives_in.state

id name lives_in.address lives_in.city lives_in.state

1

Andrea Santurbano

Times Square, 1

NY

NY

2

Davide Fantuzzi

Statue of Liberty, 10

NY

NY

当 Dataframe 列是映射时,我们在内部执行的操作是将映射扁平化,因为 Neo4j 不支持此类型用于图实体属性;因此对于像这样的 Spark 作业

val data = Seq(
  ("Foo", 1, Map("inner" -> Map("key" -> "innerValue"))),
  ("Bar", 2, Map("inner" -> Map("key" -> "innerValue1"))),
).toDF("id", "time", "table")

data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

在 Neo4j 中,带有 MyNodeWithFlattenedMap 标签的节点将存储以下信息

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.inner.key`: 'innerValue'
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.inner.key`: 'innerValue1'
}

现在,您可能会遇到以下问题情况

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

由于生成的扁平化键是重复的,因此 Neo4j Spark 将以非确定性方式选择一个关联的值。

因为我们将存储到 Neo4j 中的信息将是(请注意,顺序不可保证)

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: 'innerValue' // but it could be `value` as the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: 'innerValue1' // but it could be `value1` as the order is not guaranteed
}

将重复键分组为值的数组

您可以使用 schema.map.group.duplicate.keys 选项来避免此问题。连接器会将具有相同键的所有值分组到一个数组中。该选项的默认值为 false。在这种情况下

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .option("schema.map.group.duplicate.keys", true)
  .save()

输出将是

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: ['innerValue', 'value'] // the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: ['innerValue1', 'value1'] // the order is not guaranteed
}

约束类型映射

表 3. Spark 到 Cypher 约束类型映射

Spark 类型

Neo4j 类型

BooleanType

BOOLEAN

StringType

STRING

IntegerType

INTEGER

LongType

INTEGER

FloatType

FLOAT

DoubleType

FLOAT

DateType

DATE

TimestampType

LOCAL DATETIME

自定义 pointType 如下:Struct { type: string, srid: integer, x: double, y: double, z: double }

POINT

自定义 durationType 如下:Struct { type: string, months: long, days: long, seconds: long, nanonseconds: integer, value: string }

DURATION

DataTypes.createArrayType(BooleanType, false)

LIST<BOOLEAN NOT NULL>

DataTypes.createArrayType(StringType, false)

LIST<STRING NOT NULL>

DataTypes.createArrayType(IntegerType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(LongType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(FloatType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DoubleType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DateType, false)

LIST<DATE NOT NULL>

DataTypes.createArrayType(TimestampType, false)

LIST<LOCAL DATETIME NOT NULL>

DataTypes.createArrayType(pointType, false)

LIST<POINT NOT NULL>

DataTypes.createArrayType(durationType, false)

LIST<DURATION NOT NULL>

对于数组,我们使用没有空元素的版本,因为 Neo4j 不允许在数组中使用空元素。