Snowflake

Snowflake 是一款完全托管的 SaaS(软件即服务),它提供了一个用于数据仓库、数据湖、数据工程、数据科学、数据应用程序开发以及安全共享和使用实时/共享数据的单一平台。Snowflake 具有开箱即用的功能,例如存储和计算分离、动态可扩展计算、数据共享、数据克隆和第三方工具支持,以满足不断发展企业的严苛需求。

先决条件

您需要一个正在运行的 Snowflake 实例。如果您没有,可以从 这里 创建。

依赖项

所需的依赖项为

  • net.snowflake:spark-snowflake_<scala_version>:<version>

  • net.snowflake:snowflake-jdbc:<version>

从 Snowflake 到 Neo4j

// Step (1)
// Load a table into a Spark DataFrame
val snowflakeDF: DataFrame = spark.read
  .format("snowflake")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .load()

// Step (2)
// Save the `snowflakeDF` as nodes with labels `Person` and `Customer` into Neo4j
snowflakeDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save()
# Step (1)
# Load a table into a Spark DataFrame
snowflakeDF = (spark.read
  .format("snowflake")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .load())

# Step (2)
# Save the `snowflakeDF` as nodes with labels `Person` and `Customer` into Neo4j
(snowflakeDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save())

从 Neo4j 到 Snowflake

// Step (1)
// Load `:Person:Customer` nodes as DataFrame
val neo4jDF: DataFrame = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load()

// Step (2)
// Save the `neo4jDF` as table CUSTOMER into Snowflake
neo4jDF.write
  .format("snowflake")
  .mode("overwrite")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .save()
# Step (1)
# Load `:Person:Customer` nodes as DataFrame
neo4jDF = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load())

# Step (2)
# Save the `neo4jDF` as table CUSTOMER into Snowflake
(neo4jDF.write
  .format("snowflake")
  .mode("overwrite")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .save())