Prasanna Kumar
07/26/2023, 1:14 AMthrows org.apache.spark.sql.AnalysisException: Delta does not support specifying the schema at read time.
while reading stream changes on delta table.
Please find the below code snippet-
def purchaseDDL(): Unit = {
spark.sql(
"""CREATE or REPLACE TABLE customer_purchases (customer_id LONG, obs_year INT, obs_month INT, obs_date INT , price DOUBLE) USING delta TBLPROPERTIES (delta.enableChangeDataFeed = true)"""
)
}
def addPurchases() = {
val purchases = spark
.createDataFrame(
List(
(1, 2023, 1, 1, 2.1),
(2, 2023, 1, 5, 3.2),
(3, 2023, 1, 8, 4.4),
(1, 2023, 1, 8, 5.5)
)
)
.toDF("customer_id", "obs_year", "obs_month", "obs_date", "price")
purchases.write.mode("append").format("delta").saveAsTable("customer_purchases")
}
def readPurchases() = {
import io.delta.implicits._
spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "0")
.table("customer_purchases")
.filter("_change_type != 'update_preimage'")
.show(false)
}
purchaseDDL()
addPurchases()
/// calling readPurchases throws org.apache.spark.sql.AnalysisException: Delta does not support specifying the schema at read time.
readPurchases()