数据类型映射
Neo4j 和 Cypher® 提供了一个 类型系统,用于描述值如何在数据库中存储,但这些类型并不总是完全匹配 Spark 提供的类型。
在某些情况下,Neo4j 提供了一些 Spark 没有等效类型的数据类型,反之亦然。
数据类型映射
Neo4j 类型 | Spark 类型 | 说明 |
---|---|---|
|
|
例如: |
|
|
例如: |
|
|
例如: |
|
|
例如: |
|
|
有关 Neo4j 中空间类型的更多信息,请参阅 空间值 |
|
|
例如: |
|
|
例如: |
|
|
例如: |
|
|
例如: |
|
|
例如: |
|
|
参阅 时间函数:持续时间 |
|
|
Neo4j 中的节点表示为属性容器;也就是说,它们显示为结构体,其属性对应于节点中的任何属性。为了便于使用,通常最好从查询中返回单个属性而不是节点。 |
|
|
关系以映射形式返回,标识关系的源和目标、关系类型以及关系的属性(如果有)。为了便于使用,通常最好从查询中返回单个属性而不是关系。 |
|
|
例如: |
|
|
在 Neo4j 中,数组必须具有一致的类型(例如,数组只能包含 |
复杂数据类型
Neo4j 类型 | Spark Struct |
---|---|
Duration |
Struct(Array( ("type", DataTypes.StringType, false), ("months", DataTypes.LongType, false), ("days", DataTypes.LongType, false), ("seconds", DataTypes.LongType, false), ("nanoseconds", DataTypes.IntegerType, false), ("value", DataTypes.StringType, false) )) |
Point |
Struct(Array( ("type", DataTypes.StringType, false), ("srid", DataTypes.IntegerType, false), ("x", DataTypes.DoubleType, false), ("y", DataTypes.DoubleType, false), ("z", DataTypes.DoubleType, true), )) |
Time |
Struct(Array( ("type", DataTypes.StringType, false), ("value", DataTypes.StringType, false) )) |
映射类型
当列是映射时,连接器会尝试将其扁平化。例如,考虑以下数据集
id | name | lives_in |
---|---|---|
1 |
Andrea Santurbano |
{address: 'Times Square, 1', city: 'NY', state: 'NY'} |
2 |
Davide Fantuzzi |
{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'} |
连接器将 lives_in
列扁平化为三个列 lives_in.address
、lives_in.city
和 lives_in.state
id | name | lives_in.address | lives_in.city | lives_in.state |
---|---|---|---|---|
1 |
Andrea Santurbano |
Times Square, 1 |
NY |
NY |
2 |
Davide Fantuzzi |
Statue of Liberty, 10 |
NY |
NY |
当 Dataframe 列是映射时,我们在内部执行的操作是将映射扁平化,因为 Neo4j 不支持此类型用于图实体属性;因此对于像这样的 Spark 作业
val data = Seq(
("Foo", 1, Map("inner" -> Map("key" -> "innerValue"))),
("Bar", 2, Map("inner" -> Map("key" -> "innerValue1"))),
).toDF("id", "time", "table")
data.write
.mode(SaveMode.Append)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":MyNodeWithFlattenedMap")
.save()
在 Neo4j 中,带有 MyNodeWithFlattenedMap
标签的节点将存储以下信息
MyNodeWithFlattenedMap { id: 'Foo', time: 1, `table.inner.key`: 'innerValue' } MyNodeWithFlattenedMap { id: 'Bar', time: 1, `table.inner.key`: 'innerValue1' }
现在,您可能会遇到以下问题情况
val data = Seq(
("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
.mode(SaveMode.Append)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":MyNodeWithFlattenedMap")
.save()
由于生成的扁平化键是重复的,因此 Neo4j Spark 将以非确定性方式选择一个关联的值。
因为我们将存储到 Neo4j 中的信息将是(请注意,顺序不可保证)
MyNodeWithFlattenedMap { id: 'Foo', time: 1, `table.key.inner.key`: 'innerValue' // but it could be `value` as the order is not guaranteed } MyNodeWithFlattenedMap { id: 'Bar', time: 1, `table.key.inner.key`: 'innerValue1' // but it could be `value1` as the order is not guaranteed }
将重复键分组为值的数组
您可以使用 schema.map.group.duplicate.keys
选项来避免此问题。连接器会将具有相同键的所有值分组到一个数组中。该选项的默认值为 false
。在这种情况下
val data = Seq(
("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
.mode(SaveMode.Append)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":MyNodeWithFlattenedMap")
.option("schema.map.group.duplicate.keys", true)
.save()
输出将是
MyNodeWithFlattenedMap { id: 'Foo', time: 1, `table.key.inner.key`: ['innerValue', 'value'] // the order is not guaranteed } MyNodeWithFlattenedMap { id: 'Bar', time: 1, `table.key.inner.key`: ['innerValue1', 'value1'] // the order is not guaranteed }
约束类型映射
Spark 类型 |
Neo4j 类型 |
BooleanType |
BOOLEAN |
StringType |
STRING |
IntegerType |
INTEGER |
LongType |
INTEGER |
FloatType |
FLOAT |
DoubleType |
FLOAT |
DateType |
DATE |
TimestampType |
LOCAL DATETIME |
自定义 |
POINT |
自定义 |
DURATION |
DataTypes.createArrayType(BooleanType, false) |
LIST<BOOLEAN NOT NULL> |
DataTypes.createArrayType(StringType, false) |
LIST<STRING NOT NULL> |
DataTypes.createArrayType(IntegerType, false) |
LIST<INTEGER NOT NULL> |
DataTypes.createArrayType(LongType, false) |
LIST<INTEGER NOT NULL> |
DataTypes.createArrayType(FloatType, false) |
LIST<FLOAT NOT NULL> |
DataTypes.createArrayType(DoubleType, false) |
LIST<FLOAT NOT NULL> |
DataTypes.createArrayType(DateType, false) |
LIST<DATE NOT NULL> |
DataTypes.createArrayType(TimestampType, false) |
LIST<LOCAL DATETIME NOT NULL> |
DataTypes.createArrayType(pointType, false) |
LIST<POINT NOT NULL> |
DataTypes.createArrayType(durationType, false) |
LIST<DURATION NOT NULL> |
对于数组,我们使用没有空元素的版本,因为 Neo4j 不允许在数组中使用空元素。