-
Notifications
You must be signed in to change notification settings - Fork 5
/
assets-raw-backup.py
47 lines (37 loc) · 2.07 KB
/
assets-raw-backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# This example shows how to copy data already in CDF to RAW
# For the sake of simplicity we'll backup all assets into a RAW table
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
# initialize Spark and a few helper functions
spark = SparkSession.builder.config("spark.jars.packages","com.cognite.spark.datasource:cdf-spark-datasource_2.12:1.4.29").getOrCreate()
baseUrl = "https://greenfield.cognitedata.com"
def cdfData(**options):
reader = spark.read.format("cognite.spark.v1")
reader.option("maxRetryDelay", 5) # fail faster during testing
reader.option("baseUrl", baseUrl)
for k in options:
reader.option(k, options[k])
return reader
def cdfRaw(database, table, inferSchema=False, ensureParent=False):
# For writing and reading we have to specify the table and database
# For reading, you can set the inferSchema option, that will give you the column
# names from the table, instead of one JSON object per row. Obviously, this will
# only work if the table is not empty
return cdfData(type="raw", database=database, table=table, inferSchema=inferSchema, rawEnsureParent=ensureParent)
def loadIntoRaw(database, table, source: DataFrame):
# RAW is a bit special since it does not have a fixed schema
# When writing, we order it to use the same schema as the source dataset
# ensureParent parameter instructs the data source to create the RAW table if it does not exist already
destination = cdfRaw(database, table, ensureParent=True).schema(source.schema).load()
destination.createOrReplaceTempView("destinationRawTable")
source.select(*destination.columns).write.insertInto("destinationRawTable")
# create a Spark view for assets
cdfData(type="assets").load().createOrReplaceTempView("cdf_assets")
assetsRawCopy = spark.sql("""
-- raw rows require key column, we'll just use the asset's externalId
select externalId as key, *
from cdf_assets
-- since the key column is required, we'll only backup the assets with externalId set
where externalId is not null
""")
loadIntoRaw("test", "assets-backup", assetsRawCopy)