Yatharth Maheshwari
01/05/2023, 7:29 AMMohammad Mohtashim Khan
01/05/2023, 12:06 PMr"""
{
"payload": {
"before": null,
"after": {
"id": {
"value": "MK_1",
"set": true
},
"status": {
"value": "new_status",
"set": true
},
"status_metadata": {
"value": "new_status_metadata",
"set": true
},
"creator": {
"value": "new_creator",
"set": true
},
"created": null,
"creator_type": null,
"updater": null,
"updated": null,
"updater_type": {
"value": "new_updater_type",
"set": true
}
},
"source": {
"version": "1.7.0.13-BETA",
"connector": "yugabytedb",
"name": "dbserver1",
"ts_ms": -4258763692835,
"snapshot": "false",
"db": "yugabyte",
"sequence": "[\"0:0::0:0\",\"1:338::0:0\"]",
"schema": "public",
"table": "customer",
"txId": "",
"lsn": "1:338::0:0",
"xmin": null
},
"op": "u",
"ts_ms": 1669795724779,
"transaction": null
}
}
"""
The payload consists of before and after fields. As , visible by the `*op:u*', this is an update operation. Therefore a row in Yugabyte table called customers with id MK_1 was updated with new values. However, the after field only shows those columns whose value has been updated. Therefore the fields in "after" which are null have not been updated e.g created is null and therefore have not been updated but status is {"value": "new_status", "set": true} which means the status column value has been updated to the new value of "new_status". Now I have PySpark Structured Streaming Pipeline which takes in these payloads, processes them, and then makes a micro-data frame of the following form:
id | set_id | status | set_status | status_metadata | set_status_metadata | creator | set_creator | created | creator_type | set_created_type | updater | set_updater | updated | set_updated | updater_type | set_updater_type
The "set_column" is either true or false depending on the payload.
Problem:
Now I have a delta table on delta lake with the following schema:
id | status | status_metadata | creator | created | created_type | updater | updated | updater_type
And I am using the following code to update the above delta table using the python delta lake API (v 2.2.0):
for column in fields_map:
delta_lake_merger.whenMatchedUpdate(
condition = f"update_table.op = 'u' AND update_table.set_{column} = 'true'"
, set={column : fields_map[column]}
).execute()
Now you might be wondering why I am doing an update column-wise rather than all the columns at once. This is exactly the problem that I am facing. If I update all of the columns at once without set_col = true condition then it will overwrite the entire state of the rows for the matching id in the delta table. This is not what I want.
What do I want?
I only want to update those columns from the payload whose values are not null in the payload. If I update all columns at once like this:
delta_lake_merger.whenMatchedUpdate(
condition = f"update_table.op = 'u'"
, set=fields_map
).execute()
Then delta lake api will also replace those columns which have not been updated with nulls in the delta table since this is the value for the non-updating columns in the cdc package.
The above iterative solution works where I do an update column-wise for all of the rows in the delta table since it just ignores the specific row in the given column whose set_column is False and therefore keeps the existing value on the delta table. However, this is slow since it writes the data N times in a sequential manner which bottlenecks my streaming query. Since all of the column-wise updates are independent, is there any way in delta lake python API, I can update all of the columns at once but with the set_column condition as well? I know there might be a way because each of these is just a independent call to write data for each column with the given condition. I want to call the execute command at once for all columns with the set_condition rather than putting it in a loop.
PS: I was thinking of using asyncio library for python but so sure.
Thank you so much.Ovi
01/05/2023, 1:00 PMreplaceWhere
, because the replace condition is arbitrary and the delta version doesn't support it (and I can't upgrade it).
Also, I can't use merge
because there's no one-to-one mapping between the source data frame and the target delta table.
Any advice is welcome.
Thank you!
PS: I need to wrap these commands (delete and insert) into a transaction, if possible
DeltaTable.forPath(deltaTableLocation).delete(dataCondition)
spark.read
.load(sourceDataLocation)
.filter(dataCondition)
.write
.format("delta")
.option("mergeSchema", "true")
.mode(Overwrite)
.save(deltaTableLocation)
Jeanne Choo
01/06/2023, 4:25 AMGeir I
01/06/2023, 10:19 AMmerge
a dataframe into the table, and get error. For now I'm experimenting, and simply wiped the table and recreated. But were I to run into this issue in a situation where I could not lose data, I'm not sure what I should do.Roy Green
01/06/2023, 11:36 AMCONVERT TO DELTA parquet.`s3a:/path_to_s3_dir` no statistics PARTITIONED BY (is_valid string, created_dt string)
Any idea of how much time should the execution take? even a rough prediction would helpâŚ
Thanks!Christina
01/06/2023, 2:27 PMAthi
01/06/2023, 2:36 PMs liu
01/08/2023, 1:22 AMSamat Kurmanov
01/08/2023, 12:05 PMSamat Kurmanov
01/08/2023, 12:06 PMAkshay kumar
01/08/2023, 7:40 PMAjex
01/09/2023, 8:35 AMval date = java.time.LocalDate.now.toString
dfFolder.write.option("compression", "zstd")
.format("delta")
.mode("overwrite")
.option("replaceWhere", s"d = '$date'")
.save(folderFile)
this is my issue:
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable cannot be cast to org.apache.spark.sql.delta.commands.DeleteCommand
at org.apache.spark.sql.delta.commands.WriteIntoDelta.removeFiles(WriteIntoDelta.scala:388)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:315)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:221)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:159)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at <http://org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org|org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org>$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at vn.fpt.paytv.dbupdate.Metadata$.updateMetaData(Metadata.scala:35)
at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob$.$anonfun$main$1(DailyInfoUpdateJob.scala:40)
at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob$.$anonfun$main$1$adapted(DailyInfoUpdateJob.scala:32)
at vn.fpt.core.analytics.spark.SparkApp.withSparkSession(SparkApp.scala:17)
at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob$.main(DailyInfoUpdateJob.scala:31)
at vn.fpt.paytv.jobs.dbupdate.DailyInfoUpdateJob.main(DailyInfoUpdateJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
any help pleaseMartin
01/09/2023, 10:56 AMjaiminks
01/09/2023, 10:07 PMMarius Grama
01/10/2023, 6:27 AMx
of type int
, add data into the column, perform modifications , drop the column and subsequently I add a new column x
of type string
and add data into the newly added column and perform modifications.
While calling table_changes
function I get the message :
Error in SQL statement: DeltaColumnMappingUnsupportedSchemaIncompatibleException: Change Data Feed (CDF) read is not supported on tables with column mapping schema changes (e.g. rename or drop).
âŚ
You may force enable streaming read at your own risk by turning on spark.databricks.delta.changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled.
I did add the property spark.databricks.delta.changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled
to my test table (in tblproperties
while creating the table) - however I still receive the above mentioned exception. Can somebody provide me a hint?Hanan Shteingart
01/10/2023, 7:13 AMAfter the table is converted, make sure all writes go through Delta Lake.So, my question is, how do I add to delta new add parquet files which already are in the delta location?
Chanukya Pekala
01/10/2023, 9:59 AMspark-submit --class <http://com.xyz|com.xyz> --num-executors 4
etc., And, then there is another task type introduced recently I think â spark submit task type. If we have a JAR to execute, does spark submit task type and JAR type are implicitly working similar or different?Basem Mohammed
01/10/2023, 10:14 AMPari
01/10/2023, 10:17 AMGeir I
01/10/2023, 1:05 PMsabari dass
01/10/2023, 9:31 PMRahul Sharma
01/11/2023, 12:37 PM%%sql
MERGE INTO delta.`refine_data` v
USING delta.`raw_data` u
ON v.userID=u.userID
WHEN MATCHED AND (u.__op = "u" or u.__op = "d")
THEN DELETE
Sireesha Madabhushi
01/12/2023, 10:35 AMorg.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on <s3a://nifi-test/_delta_log/_last_checkpoint>: com.amazonaws.SdkClientException: Unable to execute HTTP request: nifi-test.minio.commonservices.svc.cluster.local: Name or service not known: Unable to execute HTTP request: nifi-test.minio.commonservices.svc.cluster.local: Name or service not known
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:214) ~[hadoop-aws-3.3.4.jar!/:na]
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[hadoop-aws-3.3.4.jar!/:na]
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3799) ~[hadoop-aws-3.3.4.jar!/:na]
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688) ~[hadoop-aws-3.3.4.jar!/:na]
at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5401) ~[hadoop-aws-3.3.4.jar!/:na]
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1465) ~[hadoop-aws-3.3.4.jar!/:na]
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1441) ~[hadoop-aws-3.3.4.jar!/:na]
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976) ~[hadoop-common-3.3.4.jar!/:na]
at io.delta.storage.HadoopFileSystemLogStore.read(HadoopFileSystemLogStore.java:46) ~[delta-storage-2.2.0.jar!/:2.2.0]
at io.delta.standalone.internal.storage.DelegatingLogStore.read(DelegatingLogStore.scala:83) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.Checkpoints.loadMetadataFromFile(Checkpoints.scala:136) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.Checkpoints.loadMetadataFromFile(Checkpoints.scala:147) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.Checkpoints.loadMetadataFromFile(Checkpoints.scala:147) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.Checkpoints.loadMetadataFromFile(Checkpoints.scala:147) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.Checkpoints.lastCheckpoint(Checkpoints.scala:110) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.Checkpoints.lastCheckpoint$(Checkpoints.scala:109) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.DeltaLogImpl.lastCheckpoint(DeltaLogImpl.scala:42) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:218) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.SnapshotManagement.$init$(SnapshotManagement.scala:37) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.DeltaLogImpl.<init>(DeltaLogImpl.scala:47) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:263) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:241) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:164) ~[delta-standalone_2.13-0.6.0.jar!/:0.6.0]
at com.siai.delta.delta_lake.DeltaLogRepository.<init>(DeltaLogRepository.java:39) ~[classes!/:na]
at com.siai.delta.delta_lake.MinioDeltaLakeRepository.writeToDeltaLake(MinioDeltaLakeRepository.java:178) ~[classes!/:na]
at com.siai.delta.delta_lake.DeltaLakeController.createDeltaTable(DeltaLakeController.java:55) ~[classes!/:na]
at jdk.internal.reflect.GeneratedMethodAccessor59.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.12.jar!/:5.3.12]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.3.12.jar!/:5.3.12]
Sireesha Madabhushi
01/12/2023, 10:36 AMSireesha Madabhushi
01/12/2023, 10:36 AMArtsiom Yudovin
01/12/2023, 12:46 PMVarbyte types are only supported for Parquet and ORC files.
Does anybody face the same issue?Halvar Trøyel Nerbø
01/12/2023, 3:21 PMPablo Flores
01/12/2023, 8:02 PMunsupported type
error. In spark we encoded the field as a java.sql.Date
. Any suggestions on how to handle this scenario or any known workaround for this?Athi
01/13/2023, 12:46 AMfor i in list:
sq = events.writeStream
.trigger(Trigger.Once)
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/"+i)
.start(output_path)