https://delta.io logo
k

Kiran Kondamadugula

05/31/2023, 7:35 PM
Hello everyone! I am hoping to find a solution. I am Running into delta.exceptions.ConcurrentAppendException even after setting up S3 Multi-Cluster Writes environment via S3 Dynamo DB LogStore My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the “put-if-absent” consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. Please find the configuration below (omitted some spark related config). spark = SparkSession \ .builder \ .appName("Delta Operations") \ .config("spark.driver.memory", args["spark_driver_memory"]) \ .config("spark.executor.memory", args["spark_executor_memory"]) \ .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", args["log_table_name"]) \ .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", args["log_region"]) \ .getOrCreate() spark.sparkContext.setLogLevel('WARN') Please find the actual logic below: delta_table.alias("old").merge( input_df.alias("new"), f"old.{primary_key} = new.{primary_key}") \ .whenMatchedDelete(condition=col(f"old.{primary_key}").isin(deletes_df)) \ .whenMatchedUpdateAll() \ .whenNotMatchedInsertAll() \ .execute() delta_table is the destination table in delta lake. input_df is a combined data frame of all the inserts, and updates. deletes_df is the dataframe that has just the deletes. I am still running into delta.exceptions.ConcurrentAppendException irrespective of these settings. Am I doing something wrong?
n

Nick Karpov

05/31/2023, 8:17 PM
S3DynamoDBLogStore will not prevent that from happening. It's purpose is to facilitate an atomic swap of the version under the hood. It prevents data corruption that would otherwise be silent. The exception you're getting is normal. Based on what you've shared it appears your MERGE transaction is being "beat" by another transaction first, thus invalidating the snapshot that was used (because it's now stale). You can read about the ConcurrentAppendException in the docs.
k

Kiran Kondamadugula

05/31/2023, 8:20 PM
Thank you, for the response @Nick Karpov . If this exception is normal, how do I handle it? My ultimate goal is to make sure all the transactions can run in parallel/concurrency.
n

Nick Karpov

05/31/2023, 8:31 PM
you can make sure no writers are changing files in the same partition at the same time, but that's not always possible. otherwise, this behavior is fundamental, if two transactions modify the same files, one of them must fail. you can either catch the exception & retry the transaction, or make them execute one at a time (serialize them)
3 Views