数据类型映射

Neo4j 和 Cypher® 提供了一个类型系统,用于描述值在数据库中的存储方式,但这些类型并非总能与 Spark 提供的类型完全匹配。

在某些情况下,Neo4j 提供的数据类型在 Spark 中没有等效类型,反之亦然。

数据类型映射

表 1. Spark 到 Neo4j 类型映射参考
Neo4j 类型 Spark 类型 备注

字符串

字符串

示例:"Hello"

整数

长整数

示例:12345

浮点数

双精度浮点数

示例:3.141592

布尔值

布尔值

示例:true

struct { type: string, srid: integer, x: double, y: double, z: double }

有关 Neo4j 中空间类型的更多信息,请参阅空间值

日期

日期

示例:2020-09-11

时间

struct { type: string, value: string }

示例:[offset-time, 12:14:08.209Z]

本地时间

struct { type: string, value: string }

示例:[local-time, 12:18:11.628]

日期时间

时间戳

示例:2020-09-11 12:17:39.192 此映射仅适用于从 Neo4j 读取时;否则,timestamp 会作为 LocalDateTime 写入 Neo4j。如果您需要写入 DateTime,请使用Cypher 写入查询

本地日期时间

时间戳

示例:2020-09-11 12:14:49.081

持续时间

struct { type: string, months: long, days: long, seconds: long, nanonseconds: integer, value: string }

请参阅时间函数:持续时间

节点

struct { <id>: long, <labels>: array[string], (PROPERTIES) }

Neo4j 中的节点表示为属性容器;即它们以结构体的形式出现,其属性对应于节点中的任何属性。为了方便使用,通常最好从查询中返回单个属性而非整个节点。

关系

struct { <rel.id>: long, <rel.type>: string, <source.id>: long, <target.id>: long, (PROPERTIES) }

关系以映射的形式返回,标识关系的源和目标、其类型以及关系的属性(如果有)。为了方便使用,通常最好从查询中返回单个属性而非整个关系。

路径

字符串

示例:path[(322)←[20280:AIRLINE]-(33510)]为了方便使用,建议使用路径函数从查询中返回路径的单个属性/方面。

[同类型数组]

array[元素]

在 Neo4j 中,数组必须类型一致(例如,一个数组只能包含 Float 值)。内部 Spark 类型与上述类型映射一致。

复杂数据类型

Spark 不原生支持所有 Neo4j 数据类型(例如PointTimeDuration)。此类类型会被转换为包含所有有用数据的 Struct 类型。

表 2. 复杂数据类型转换
Neo4j 类型 Spark 结构体

持续时间

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("months", DataTypes.LongType, false),
    ("days", DataTypes.LongType, false),
    ("seconds", DataTypes.LongType, false),
    ("nanoseconds", DataTypes.IntegerType, false),
    ("value", DataTypes.StringType, false)
  ))

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("srid", DataTypes.IntegerType, false),
    ("x", DataTypes.DoubleType, false),
    ("y", DataTypes.DoubleType, false),
    ("z", DataTypes.DoubleType, true),
  ))

时间

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("value", DataTypes.StringType, false)
  ))

映射类型

当列为映射类型时,连接器会尝试将其展平。例如,考虑以下数据集

ID 名称 居住地

1

Andrea Santurbano

{address: 'Times Square, 1', city: 'NY', state: 'NY'}

2

Davide Fantuzzi

{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}

连接器将 lives_in 列展平为三列:lives_in.addresslives_in.citylives_in.state

ID 名称 lives_in.address lives_in.city lives_in.state

1

Andrea Santurbano

Times Square, 1

NY

NY

2

Davide Fantuzzi

Statue of Liberty, 10

NY

NY

当 Dataframe 列是映射时,我们在内部将其展平,因为 Neo4j 不支持此类型作为图实体属性;因此,对于像这样的 Spark 作业

val data = Seq(
  ("Foo", 1, Map("inner" -> Map("key" -> "innerValue"))),
  ("Bar", 2, Map("inner" -> Map("key" -> "innerValue1"))),
).toDF("id", "time", "table")

data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

在 Neo4j 中,对于带有 MyNodeWithFlattenedMap 标签的节点,您会发现以下信息已存储

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.inner.key`: 'innerValue'
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.inner.key`: 'innerValue1'
}

现在您可能会遇到以下问题

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

由于生成的展平键是重复的,Neo4j Spark 将以非确定性方式选择其中一个关联值。

因为我们将存储到 Neo4j 中的信息将是这样的(请注意顺序不保证)

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: 'innerValue' // but it could be `value` as the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: 'innerValue1' // but it could be `value1` as the order is not guaranteed
}

将重复键分组为值数组

您可以使用选项 schema.map.group.duplicate.keys 来避免此问题。连接器会将所有具有相同键的值分组到一个数组中。此选项的默认值为 false。在以下场景中

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .option("schema.map.group.duplicate.keys", true)
  .save()

输出将是

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: ['innerValue', 'value'] // the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: ['innerValue1', 'value1'] // the order is not guaranteed
}

约束类型映射

表 3. Spark 到 Cypher 约束类型映射

Spark 类型

Neo4j 类型

BooleanType

BOOLEAN

StringType

STRING

IntegerType

INTEGER

LongType

INTEGER

FloatType

FLOAT

DoubleType

FLOAT

DateType

DATE

TimestampType

本地日期时间

自定义 pointType 如下:Struct { type: string, srid: integer, x: double, y: double, z: double }

POINT

自定义 durationType 如下:Struct { type: string, months: long, days: long, seconds: long, nanonseconds: integer, value: string }

DURATION

DataTypes.createArrayType(BooleanType, false)

LIST<BOOLEAN NOT NULL>

DataTypes.createArrayType(StringType, false)

LIST<STRING NOT NULL>

DataTypes.createArrayType(IntegerType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(LongType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(FloatType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DoubleType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DateType, false)

LIST<DATE NOT NULL>

DataTypes.createArrayType(TimestampType, false)

LIST<LOCAL DATETIME NOT NULL>

DataTypes.createArrayType(pointType, false)

LIST<POINT NOT NULL>

DataTypes.createArrayType(durationType, false)

LIST<DURATION NOT NULL>

对于数组,我们特别使用了不允许空元素的版本,因为 Neo4j 不允许数组中包含空元素。

© . All rights reserved.