https://delta.io logo
p

Prasanna Kumar

07/26/2023, 1:14 AM
The below code ran to an exception.
throws org.apache.spark.sql.AnalysisException: Delta does not support specifying the schema at read time.
while reading stream changes on delta table. Please find the below code snippet-
Copy code
def purchaseDDL(): Unit = {
  spark.sql(
    """CREATE or REPLACE TABLE  customer_purchases (customer_id LONG, obs_year INT, obs_month INT, obs_date INT , price DOUBLE) USING delta TBLPROPERTIES (delta.enableChangeDataFeed = true)"""
  )
}

def addPurchases() = {
  val purchases = spark
    .createDataFrame(
      List(
        (1, 2023, 1, 1, 2.1),
        (2, 2023, 1, 5, 3.2),
        (3, 2023, 1, 8, 4.4),
        (1, 2023, 1, 8, 5.5)
      )
    )
    .toDF("customer_id", "obs_year", "obs_month", "obs_date", "price")

  purchases.write.mode("append").format("delta").saveAsTable("customer_purchases")
}

def readPurchases() = {
  import io.delta.implicits._
  
  spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", "0")
    .table("customer_purchases")
    .filter("_change_type != 'update_preimage'")
    .show(false)
}
purchaseDDL()
addPurchases()
/// calling readPurchases throws org.apache.spark.sql.AnalysisException: Delta does not support specifying the schema at read time.
readPurchases()