Rahul Sharma
02/15/2023, 4:05 PMspark_session_micro_batch.sql(f"""
SELECT *
FROM (
select *
,row_number() over(partition by {<http://self.pk|self.pk>} order by __source_ts_ms desc ) as dedupe
from {self.micro_batch_view}
)
WHERE
dedupe =1
""").drop("dedupe").createOrReplaceTempView(self.micro_batch_dedup_view)
and this is our merge query
SqlQuery = """
MERGE INTO {db_name}.{refine_table_name} v
USING dedup_temp_view u
ON v.{pk}=u.{pk}
WHEN MATCHED AND u.__op = "u"
THEN UPDATE SET *
WHEN MATCHED AND u.__op = "d"
THEN DELETE
WHEN NOT MATCHED AND (u.__op = "c" or u.__op = "r")
THEN INSERT *
""".format(refine_table_name=refine_table_name,db_name=db_name,pk=pk)
Jeremy Jordan
02/15/2023, 8:25 PMPlease upgrade your Delta table to reader version 2 and writer version 5
and change the column mapping mode to 'name' mapping. You can use the following command:
ALTER TABLE <table_name> SET TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')
Nick Karpov
02/15/2023, 11:00 PMcase/when
expressions - there's been questions here specifically on handling this so i'm sharing here too... it's hard to fit everything in one post so we do plan to build on this - please leave a comment if there's something you'd like to see!Harry
02/15/2023, 11:13 PMKenny Ma
02/16/2023, 12:03 AMRahul Sharma
02/16/2023, 4:16 AMtargetfilesize_refine='5000000' #byte
alter table delta.`{table_loc}`
SET TBLPROPERTIES (
'delta.targetFileSize'='{self.targetfilesize_refine}',
'delta.tuneFileSizesForRewrites'='{self.tunefilesizesforrewrites_refine}'
)
""")
Vikash Kumar
02/16/2023, 11:56 AMreaderVersion
and writeVersion
new protocol which is replacing minReaderVersion
and minWriterVersion
respectively?
https://github.com/delta-io/delta/pull/1450/files#diff-62d56fa1c25dda8d49ccdbf9ea31df4576f84625706d54613dff0acc8f974d97R509-R510Artsiom Yudovin
02/16/2023, 3:40 PMAmazonS3Exception: Slow Down (Service: Amazon S3; Status Code: 503; Error Code: 503 Slow Down
during the delta lake merge?
We have a delta table which is partitioned by columns.abhijeet_naib
02/16/2023, 4:20 PMRahul Sharma
02/17/2023, 5:32 AM{"commitInfo":{"timestamp":1676611727893,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.compatibility.symlinkFormatManifest.enabled\":\"true\",\"delta.targetFileSize\":\"10mb\",\"delta.tuneFileSizesForRewrites\":\"10mb\"}"}
Yan Zhao
02/19/2023, 2:05 AMMisha Nguyen
02/20/2023, 12:19 AMJon Stockham
02/20/2023, 11:44 AMdelta.compatibility.symlinkFormatManifest.enabled=true.
Occasionally I am finding that I receive errors if the write that triggers the manifest generation results in a checkpoint file being created and pre-checkpoint transaction log files being deleted. If I run the job again it will work just fine.
Is the best way to avoid this to disable the automatic generation and instead do it manually after the data write with DeltaTable.forPath(...).generate()
? Or am I overlooking something?
org.apache.spark.sql.delta.DeltaRuntimeException: Committing to the Delta table version 210 succeeded but error while executing post-commit hook Generate Symlink Format Manifest: Generate Symlink Format Manifest
Encountered error while reading file <s3://my-bucket/my-table/_delta_log/00000000000000000201.json>
Caused by: java.io.FileNotFoundException: File not present on S3
Alberto Rguez
02/20/2023, 1:56 PMAnjaneya Alluri
02/21/2023, 1:43 AMAllie Ubisse
02/21/2023, 6:35 AMspark.hadoop.datanucleus.connectionPoolingType hikari
We got the same error as below.
Attempt 2:
Docs :
• https://learn.microsoft.com/en-us/azure/databricks/release-notes/runtime/10.4ml
• https://learn.microsoft.com/en-us/azure/databricks/release-notes/runtime/10.4#hikaricp-is-now-the-default-hive-metastore-connection-pool
installed packages: x
So 10.4 ML is different to 10.4 in that 10.4 ML is not using hikari (and has the below libraries missing from the databricks Libraries UI)
• We installed the following (because this is the standard from 10.4 non-ml version):
org.datanucleus:datanucleus-api-jdo:4.2.4
org.datanucleus:datanucleus-core:4.1.17
org.datanucleus:datanucleus-rdbms:4.1.19
org.datanucleus:javax.jdo:3.2.0-m3
configuration that we've tried:
spark.hadoop.datanucleus.connectionPoolingType hikari
spark.hadoop.datanucleus.connectionPoolingType HikariCP
spark.databricks.hive.metastore.client.pool.type hikari
spark.databricks.hive.metastore.client.pool.type HikariCP
All jar files were added and we are getting the following error after restarting.
``org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775)`
... 128 more
`Caused by: java.lang.Throwable: Attempt to invoke the "hikari" plugin to create a ConnectionPool gave an error : The connection pool plugin of type "hikari" was not found in the CLASSPATH!``
Any one with knowledge on how to resolve this issue, we would appreciate your help. ThanksMarius Grama
02/21/2023, 8:51 AMThe Delta transaction log will often contain many (e.g. 10,000+) files.
Prateek Koul
02/21/2023, 1:07 PMLucas Zago
02/21/2023, 5:08 PMRahul Sharma
02/22/2023, 5:17 AM+----+--------+-----------------------+------+----+------------------------+
|rank|amount |lastUpdateTime |status|__op|__source_ts_ms |
+----+--------+-----------------------+------+----+------------------------+
|1 |null |2023-02-08 12:38:55.407|0 |r |08-Feb-2023 10:53:52.000|
|2 |100.0000|2023-02-08 15:02:06.7 |2 |u |08-Feb-2023 09:32:06.000|
|3 |100.0000|2023-02-08 15:01:36.95 |0 |u |08-Feb-2023 09:31:36.000|
|4 |100.0000|2023-02-08 15:01:34.147|0 |u |08-Feb-2023 09:31:34.000|
+----+--------+-----------------------+------+----+------------------------+
Gustaf
02/22/2023, 10:46 AMLucas Zago
02/22/2023, 8:24 PMmothukur
02/23/2023, 7:12 AMMarius Grama
02/23/2023, 11:29 AMtable_changes
(for reading from the change data feed) ?
https://docs.databricks.com/sql/language-manual/functions/table_changes.htmlMohammad Mohtashim Khan
02/24/2023, 6:29 PM# create or delete operation
delta_lake_table.alias("main_table").merge(
latest_changes_delete_create_df.alias(
"update_table"), merge_condition
).whenMatchedDelete(condition="update_table.op = 'd'").whenNotMatchedInsertAll(
condition="update_table.op = 'c' OR update_table.op = 'r'"
).execute()
# update operation
delta_lake_table.alias("main_table").merge(
latest_changes_update_df.alias("update_table"), merge_condition
).whenMatchedUpdateAll(
condition="update_table.op = 'u' OR update_table.op = 'r'"
).execute()
Now, the thing is the data schema can evolve, therefore I am using whenMatchedUpdateAll and whenNotMatchedInsertAll while also setting the following config in spark configurations: 'spark.databricks.delta.schema.autoMerge.enabled = true'. However, when a new column comes in I am facing the following error:
pyspark.sql.utils.AnalysisException: The schema of your Delta table has changed in an incompatible way since your DataFrame
or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object.
Changes:
Latest schema has additional field(s): op
Can someone please help and tell me what I am doing wrong. I am using delta lake 2.2.0 (Open-Source) on top of Minio. And have the followed the following documentation: https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution .
Thank you.Ashutosh Pandey
02/24/2023, 8:25 PMMohammad Mohtashim Khan
02/24/2023, 9:33 PMSamrose
02/26/2023, 5:42 AMRajath Chandregowda
02/28/2023, 10:04 AMdhia Gharsallaoui
02/28/2023, 10:25 AMcompaction
and vacuum
on my delta tables.
I'm using the scala API to perform those operations. I run this for each table in parallel.
I have often an OOM and I'm suspecting that the jobs run on the driver.
Does anyone have an idea if this is true and if it's the case it's possible to run them on the executors.
Thank you!