https://delta.io logo
a

Ajex

01/09/2023, 8:35 AM
Hi guy, i got an issue when write data using replaceWhere, this my code:
Copy code
val date = java.time.LocalDate.now.toString
dfFolder.write.option("compression", "zstd")
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", s"d = '$date'")
      .save(folderFile)
this is my issue:
Copy code
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable cannot be cast to org.apache.spark.sql.delta.commands.DeleteCommand
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.removeFiles(WriteIntoDelta.scala:388)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:315)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91)
	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:221)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:159)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at <http://org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org|org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org>$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at vn.fpt.paytv.dbupdate.Metadata$.updateMetaData(Metadata.scala:35)
	at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob$.$anonfun$main$1(DailyInfoUpdateJob.scala:40)
	at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob$.$anonfun$main$1$adapted(DailyInfoUpdateJob.scala:32)
	at vn.fpt.core.analytics.spark.SparkApp.withSparkSession(SparkApp.scala:17)
	at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob$.main(DailyInfoUpdateJob.scala:31)
	at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob.main(DailyInfoUpdateJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
any help please
j

Jon Stockham

01/09/2023, 8:49 AM
is the table partitioned by
d
? what version of Delta Lake are you running?
a

Ajex

01/09/2023, 8:49 AM
i'm running 2.1.1
nope, using with non-partition table
j

Jon Stockham

01/09/2023, 9:02 AM
I wondered if it related to this, but I guess not if you're on 2.1.1:
In Delta Lake 1.0.0 and below,
replaceWhere
overwrites data matching a predicate over partition columns only.
maybe check the value of
spark.databricks.delta.replaceWhere.dataColumns.enabled
. It should be set to
true
r

Ryan Zhu

01/09/2023, 1:41 PM
How did you run the code? This is usually because you were missing the necessary spark conf
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
1
a

Ajex

01/10/2023, 3:00 AM
Thank you guy @Jon Stockham, @Ryan Zhu i'm missing delta conf.
10 Views