Scott Sandre (Delta Lake)
01/13/2023, 10:47 PMnumDeletedFiles
) will now show up in table history.
• Support idempotent writes for DML operations. This feature adds idempotency to INSERTS/DELETE/UPDATE/MERGE etc. operations using SQL configurations spark.databricks.delta.write.txnAppId
and spark.databricks.delta.write.txnVersion
.
• Support passing Hadoop configurations via DeltaTable API
from delta.tables import DeltaTable
hadoop_config = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "...",
"fs.azure.account.oauth2.client.id": "...",
"fs.azure.account.oauth2.client.secret": "...",
"fs.azure.account.oauth2.client.endpoint": "..."
}
delta_table = DeltaTable.forPath(spark, <table-path>, hadoop_config)
• Minor convenience improvement to the DeltaTableBuilder:executeZOrderBy
Java API which allows users to pass in varargs instead of a List.
• Fail fast on malformed delta log JSON entries. Previously, Delta queries could return inaccurate results whenever JSON commits in the _delta_log
were malformed. For example, an add
action with a missing }
would be skipped. Now, queries will fail fast, preventing inaccurate results.
• Fix “Could not find active SparkSession” bug by passing in the SparkSession when resolving tables in the DeltaTableBuilder.
Credits:
Helge Brügner, Jiaheng Tang, Mitchell Riley, Ryan Johnson, Scott Sandre, Venki Korukanti, Jintao Shen, Yann ByronCarly Akerly
01/24/2023, 8:41 PMCarly Akerly
01/26/2023, 10:15 PMCarly Akerly
02/23/2023, 8:31 PMrtyler
02/24/2023, 8:09 PMDenny Lee
03/14/2023, 2:36 PMCarly Akerly
03/23/2023, 9:15 PMAllison Portis
03/23/2023, 11:54 PMCONVERT TO DELTA
. This generates a Delta table in the same location and does not rewrite any parquet files.
• Support SHALLOW CLONE for Delta, Parquet, and Iceberg tables to clone a source table without copying the data files. SHALLOW CLONE
creates a copy of the source table’s definition but refers to the source table’s data files.
• Support idempotent writes for DML operations. This feature adds idempotency to `INSERT`/`DELETE`/`UPDATE`/`MERGE` etc. operations using SQL configurations spark.databricks.delta.write.txnAppId
and spark.databricks.delta.write.txnVersion
.
• Support “when not matched by source” clauses for the Merge command to update or delete rows in the chosen table that don’t have matches in the source table based on the merge condition. This clause is supported in the Python, Scala, and Java DeltaTable
APIs. SQL Support will be added in Spark 3.4.
• Support CREATE TABLE LIKE
to create empty Delta tables using the definition and metadata of an existing table or view.
• Support reading Change Data Feed (CDF) in SQL queries using the table_changes
table-valued function.
• Unblock Change Data Feed (CDF) batch reads on column mapping enabled tables when DROP COLUMN
and RENAME COLUMN
have been used.
• Improved read and write performance on S3 when writing from a single cluster. Efficient file listing decreases the metadata processing time when calculating a table snapshot. This is most impactful for tables with many commits. Set the Hadoop configuration delta.enableFastS3AListFrom
to true
to enable it.
• Record VACUUM
operations in the transaction log. With this feature, VACUUM
operations and their associated metrics (e.g. numDeletedFiles
) will now show up in table history.
• Support reading Delta tables with deletion vectors.
• Other notable changes
◦ Support schema evolution in MERGE
for UPDATE SET <assignments> and INSERT (...) VALUES (...) actions
. Previously, schema evolution was only supported for UPDATE SET *
and INSERT *
actions.
◦ Add .show()
support for COUNT(*)
aggregate pushdown.
◦ Enforce idempotent writes for df.saveAsTable
for overwrite and append mode.
◦ Support Table Features to selectively add individual features when upgrading the table protocol version. This enables users to only add active features and will facilitate connectivity as downstream Delta connectors can selectively implement feature support.
◦ Automatically generate partition filters for additional generation expressions.
▪︎ Support the trunc and date_trunc functions.
▪︎ Support for the date_format
function with format yyyy-MM-dd
.
◦ Block protocol downgrades when replacing a Delta table to prevent any incorrect time-travel or CDF queries.
◦ Fix replaceWhere
with the DataFrame V2 overwrite API to correctly evaluate less than conditions.
◦ Fix dynamic partition overwrite for tables with more than one partition data type.
◦ Fix schema evolution for INSERT OVERWRITE
with complex data types when the source schema is read incompatible.
◦ Fix Delta streaming source to correctly detect read-incompatible schema changes during backfill when there is exactly one schema change in the versions read.
◦ Fix a bug in VACUUM
where sometimes the default retention period was used to remove files instead of the retention period specified in the table properties.
◦ Include the table name in the DataFrame returned by the deltaTable.details()
Python/Scala/Java API.
◦ Improve the log message for VACUUM table_name DRY RUN
.
How use the preview release
For this preview we have published the artifacts to a staging repository. Here’s how you can use them:
• spark-submit: Add –-repositories <https://oss.sonatype.org/content/repositories/iodelta-1066/>
to the command line arguments. For example:
spark-submit --packages io.delta:delta-core_2.12:2.3.0rc1 --repositories <https://oss.sonatype.org/content/repositories/iodelta-1066/> examples/examples.py
• Currently Spark shells (PySpark and Scala) do not accept the external repositories option. However, once the artifacts have been downloaded to the local cache, the shells can be run with Delta 2.3.0rc1
by just providing the --packages io.delta:delta-core_2.12:2.3.0rc1
argument.
• Maven project:
<repositories>
<repository>
<id>staging-repo</id>
<url> <https://oss.sonatype.org/content/repositories/iodelta-1066/></url>
</repository>
</repositories>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.3.0rc1</version>
</dependency>
• SBT project:
libraryDependencies += "io.delta" %% "delta-core" % "2.3.0rc1"
resolvers += "Delta" at <https://oss.sonatype.org/content/repositories/iodelta-1066/>
• Delta-spark:
pip install -i <https://test.pypi.org/simple/> delta-spark==2.3.0rc1
Denny Lee
04/05/2023, 4:45 AMJim Hibbard
04/05/2023, 6:14 PMAllison Portis
04/05/2023, 6:44 PMCONVERT TO DELTA
. This generates a Delta table in the same location and does not rewrite any parquet files. See the documentation for details.
• Support SHALLOW CLONE for Delta, Parquet, and Iceberg tables to clone a source table without copying the data files. SHALLOW CLONE
creates a copy of the source table’s definition but refers to the source table’s data files.
• Support idempotent writes for DML operations. This feature adds idempotency to `INSERT`/`DELETE`/`UPDATE`/`MERGE` etc. operations using SQL configurations spark.databricks.delta.write.txnAppId
and spark.databricks.delta.write.txnVersion
.
• Support “when not matched by source” clauses for the Merge command to update or delete rows in the chosen table that don’t have matches in the source table based on the merge condition. This clause is supported in the Python, Scala, and Java DeltaTable
APIs. SQL Support will be added in Spark 3.4.
• Support CREATE TABLE LIKE to create empty Delta tables using the definition and metadata of an existing table or view.
• Support reading Change Data Feed (CDF) in SQL queries using the table_changes table-valued function.
• Unblock Change Data Feed (CDF) batch reads on column mapping enabled tables when DROP COLUMN
and RENAME COLUMN
have been used. See the documentation for more details.
• Improved read and write performance on S3 when writing from a single cluster. Efficient file listing decreases the metadata processing time when calculating a table snapshot. This is most impactful for tables with many commits. Set the Hadoop configuration delta.enableFastS3AListFrom
to true
to enable it.
• Record VACUUM
operations in the transaction log. With this feature, VACUUM
operations and their associated metrics (e.g. numDeletedFiles
) will now show up in table history.
• Support reading Delta tables with deletion vectors.
• Other notable changes
◦ Support schema evolution in MERGE
for UPDATE SET <assignments> and INSERT (...) VALUES (...) actions
. Previously, schema evolution was only supported for UPDATE SET *
and INSERT *
actions.
◦ Add .show()
support for COUNT(*)
aggregate pushdown.
◦ Enforce idempotent writes for df.saveAsTable
for overwrite and append mode.
◦ Support Table Features to selectively add individual features when upgrading the table protocol version. This enables users to only add active features and will facilitate connectivity as downstream Delta connectors can selectively implement feature support.
◦ Automatically generate partition filters for additional generation expressions.
▪︎ Support the trunc and date_trunc functions.
▪︎ Support for the date_format
function with format yyyy-MM-dd
.
◦ Block protocol downgrades when replacing a Delta table to prevent any incorrect time-travel or CDF queries.
◦ Fix replaceWhere
with the DataFrame V2 overwrite API to correctly evaluate less than conditions.
◦ Fix dynamic partition overwrite for tables with more than one partition data type.
◦ Fix schema evolution for INSERT OVERWRITE
with complex data types when the source schema is read incompatible.
◦ Fix Delta streaming source to correctly detect read-incompatible schema changes during backfill when there is exactly one schema change in the versions read.
◦ Fix a bug in VACUUM
where sometimes the default retention period was used to remove files instead of the retention period specified in the table properties.
◦ Include the table name in the DataFrame returned by the deltaTable.details()
Python/Scala/Java API.
◦ Improve the log message for VACUUM table_name DRY RUN
.Denny Lee
04/06/2023, 10:54 PMCarly Akerly
04/26/2023, 5:42 PMCarly Akerly
05/02/2023, 12:25 PMCarly Akerly
05/05/2023, 7:48 PMAllison Portis
05/19/2023, 12:10 AMDELETE
command. Previously, when deleting rows from a Delta table, any file with at least one matching row would be rewritten. With Deletion Vectors these expensive rewrites can be avoided. See What are deletion vectors? for more details.
• Support for all write operations on tables with Deletion Vectors enabled.
• Support PURGE
to remove Deletion Vectors from the current version of a Delta table by rewriting any data files with deletion vectors. See the documentation for more details.
• Support reading Change Data Feed for tables with Deletion Vectors enabled.
• Support REPLACE WHERE
expressions in SQL to selectively overwrite data. Previously “replaceWhere” options were only supported in the DataFrameWriter APIs.
• Support WHEN NOT MATCHED BY SOURCE
clauses in SQL for the Merge command.
• Support omitting generated columns from the column list for SQL INSERT INTO
queries. Delta will automatically generate the values for any unspecified generated columns.
• Support the TimestampNTZ
data type added in Spark 3.3. Using TimestampNTZ
requires a Delta protocol upgrade; see the documentation for more information.
• Other notable changes
◦ Increased resiliency for S3 multi-cluster reads and writes.
▪︎ Use a per-JVM lock to minimize the number of concurrent recovery attempts. Concurrent recoveries may cause concurrent readers to see a RemoteFileChangedException
.
▪︎ Catch any RemoteFileChangedException
in the reader and retry reading.
◦ Allow changing the column type of a char
or varchar
column to a compatible type in the ALTER TABLE
command. The new behavior is the same as in Apache Spark and allows upcasting from char
or varchar
to varchar
or string
.
◦ Block using overwriteSchema
with dynamic partition overwrite. This can corrupt the table as not all the data may be removed, and the schema of the newly written partitions may not match the schema of the unchanged partitions.
◦ Return an empty DataFrame
for Change Data Feed reads when there are no commits within the timestamp range provided. Previously an error would be thrown.
◦ Fix a bug in Change Data Feed reads for records created during the ambiguous hour when daylight savings occurs.
◦ Fix a bug where querying an external Delta table at the root of an S3 bucket would throw an error.
◦ Remove leaked internal Spark metadata from the Delta log to make any affected tables readable again.
Note: the Delta Lake 2.4 release does not include the Iceberg to Delta converter because iceberg-spark-runtime
does not support Spark 3.4 yet. The Iceberg to Delta converter is still supported when using Delta 2.3 with Spark 3.3.
How use the preview release:
For this preview we have published the artifacts to a staging repository. Here’s how you can use them:
• spark-submit: Add –-repositories <https://oss.sonatype.org/content/repositories/iodelta-1080/>
to the command line arguments. For example:
spark-submit --packages io.delta:delta-core_2.12:2.4.0rc1 --repositories <https://oss.sonatype.org/content/repositories/iodelta-1080/> examples/examples.py
• Currently Spark shells (PySpark and Scala) do not accept the external repositories option. However, once the artifacts have been downloaded to the local cache, the shells can be run with Delta 2.4.0rc1 by just providing the --packages io.delta:delta-core_2.12:2.4.0rc1
argument.
• Maven project:
<repositories>
<repository>
<id>staging-repo</id>
<url> <https://oss.sonatype.org/content/repositories/iodelta-1080/></url>
</repository>
</repositories>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.4.0rc1</version>
</dependency>
• SBT project:
libraryDependencies += "io.delta" %% "delta-core" % "2.4.0rc1"
resolvers += "Delta" at <https://oss.sonatype.org/content/repositories/iodelta-1080/>
• Delta-spark:
pip install -i <https://test.pypi.org/simple/> delta-spark==2.4.0rc1
Carly Akerly
05/23/2023, 7:14 PMAllison Portis
05/25/2023, 10:11 PMDELETE
command. Previously, when deleting rows from a Delta table, any file with at least one matching row would be rewritten. With Deletion Vectors these expensive rewrites can be avoided. See What are deletion vectors? for more details.
• Support for all write operations on tables with Deletion Vectors enabled.
• Support PURGE
to remove Deletion Vectors from the current version of a Delta table by rewriting any data files with deletion vectors. See the documentation for more details.
• Support reading Change Data Feed for tables with Deletion Vectors enabled.
• Support REPLACE WHERE
expressions in SQL to selectively overwrite data. Previously “replaceWhere” options were only supported in the DataFrameWriter APIs.
• Support WHEN NOT MATCHED BY SOURCE
clauses in SQL for the Merge command.
• Support omitting generated columns from the column list for SQL INSERT INTO
queries. Delta will automatically generate the values for any unspecified generated columns.
• Support the TimestampNTZ
data type added in Spark 3.3. Using TimestampNTZ
requires a Delta protocol upgrade; see the documentation for more information.
• Other notable changes
◦ Increased resiliency for S3 multi-cluster reads and writes.
▪︎ Use a per-JVM lock to minimize the number of concurrent recovery attempts. Concurrent recoveries may cause concurrent readers to see a RemoteFileChangedException
.
▪︎ Catch any RemoteFileChangedException
in the reader and retry reading.
◦ Allow changing the column type of a char
or varchar
column to a compatible type in the ALTER TABLE
command. The new behavior is the same as in Apache Spark and allows upcasting from char
or varchar
to varchar
or string
.
◦ Block using overwriteSchema
with dynamic partition overwrite. This can corrupt the table as not all the data may be removed, and the schema of the newly written partitions may not match the schema of the unchanged partitions.
◦ Return an empty DataFrame
for Change Data Feed reads when there are no commits within the timestamp range provided. Previously an error would be thrown.
◦ Fix a bug in Change Data Feed reads for records created during the ambiguous hour when daylight savings occurs.
◦ Fix a bug where querying an external Delta table at the root of an S3 bucket would throw an error.
◦ Remove leaked internal Spark metadata from the Delta log to make any affected tables readable again.Scott Sandre (Delta Lake)
06/29/2023, 3:00 PMCREATE TABLE T (c1 INT) USING DELTA SET TBLPROPERTIES (
'delta.universalFormat.enabledFormats' = 'iceberg');
Every write to this table will automatically keep Iceberg metadata updated. See the documentation here for more details.
Delta Kernel
The Delta Kernel project is a set of Java libraries (Rust will be coming soon) for building Delta connectors that can read (and soon, write to) Delta tables without the need to understand the Delta protocol details).
You can use this library to do the following:
• Read data from small Delta tables in a single thread in a single process.
• Read data from large Delta tables using multiple threads in a single process.
• Build a complex connector for a distributed processing engine and read very large Delta tables.
• [soon!] Write to Delta tables from multiple threads / processes / distributed engines.
Here is an example of a simple table scan with a filter:
TableClient myTableClient = DefaultTableClient.create() ; // define a client (more details below)
Table myTable = Table.forPath("/delta/table/path"); // define what table to scan
Snapshot mySnapshot = myTable.getLatestSnapshot(myTableClient); // define which version of table to scan
Scan myScan = mySnapshot.getScanBuilder(myTableClient) // specify the scan details
.withFilters(scanFilter)
.build();
Scan.readData(...) // returns the table data
For more information, refer to Delta Kernel Github docs.
Delta Connectors: welcome to the Delta repository!
All previous connectors from https://github.com/delta-io/connectors have been moved to this repository (https://github.com/delta-io/delta) as we aim to unify our Delta connector ecosystem structure. This includes Delta-Standalone, Delta-Flink, Delta-Hive, PowerBI, and SQL-Delta-Import. The repository https://github.com/delta-io/connectors is now deprecated.
Delta Spark
Delta Spark 3.0.0 is built on top of Apache Spark™ 3.4. Similar to Apache Spark, we have released Maven artifacts for both Scala 2.12 and Scala 2.13. Note that the Delta Spark maven artifact has been renamed from delta-core
to delta-spark
.
• Documentation: https://docs.delta.io/3.0.0rc1/
• Maven artifacts: delta-spark_2.12, delta-spark_2.13, delta-contribs_2.12, delta_contribs_2.13, delta-storage, delta-storage-s3-dynamodb, delta-iceberg_2.12, delta-iceberg_2.13
• Python artifacts: https://pypi.org/project/delta-spark/3.0.0rc1/
The key features of this release are
• Delta Universal Format. Write as Delta, read as Iceberg! See the highlighted section above.
• Up to 2x faster MERGE operation . MERGE now better leverages data skipping, the ability to use the insert-only code path in more cases, and an overall improved execution to achieve up to 2x better performance in various scenarios.
• Performance of DELETE using Deletion Vectors improved by more than 2x. This fix improves the file path canonicalization logic by avoiding calling expensive Path.toUri.toString
calls for each row in a table, resulting in a several hundred percent speed boost on DELETE operations (only when Deletion Vectors have been enabled on the table).
• Support streaming reads from column mapping enabled tables when DROP COLUMN
and RENAME COLUMN
have been used. This includes streaming support for Change Data Feed. See the documentation here for more details.
• Support specifying the columns for which Delta will collect file-skipping statistics via the table property delta.dataSkippingStatsColumns
. Previously, Delta would only collect file-skipping statistics for the first N
columns in the table schema (default to 32). Now, users can easily customize this.
• Support zero-copy convert to Delta from Iceberg tables on Apache Spark 3.4 using CONVERT TO DELTA
. This feature was excluded from the Delta Lake 2.4 release since Iceberg did not yet support Apache Spark 3.4. This command generates a Delta table in the same location and does not rewrite any parquet files.
Other notable changes include
• Minor fix to Delta table path URI concatenation
• Support writing parquet data files to the data
subdirectory via the SQL configuration spark.databricks.delta.write.dataFilesToSubdir
. This is used to add UniForm support on BigQuery.
Delta Flink
Delta-Flink 3.0.0 is built on top of Apache Flink™ 1.16.1.
• Documentation: https://github.com/delta-io/delta/blob/branch-3.0/connectors/flink/README.md
• Maven artifacts: delta-flink
The key features of this release are
• Support for Flink SQL and Catalog. You can now use the Flink/Delta connector for Flink SQL jobs. You can CREATE
Delta tables, SELECT
data from them (uses the Delta Source), and INSERT
new data into them (uses the Delta Sink). Note: for correct operations on the Delta tables, you must first configure the Delta Catalog using CREATE CATALOG
before running a SQL command on Delta tables. For more information, please see the documentation here.
• Significant performance improvement to Global Committer initialization. The last-successfully-committed delta version by a given Flink application is now loaded lazily, significantly reducing the CPU utilization in the most common scenarios.
Delta Standalone
• Documentation: https://docs.delta.io/3.0.0rc1/delta-standalone.html
• Maven artifacts: delta-standalone_2.12, delta-standalone_2.13
The key features in this release are:
• Support for disabling Delta checkpointing during commits. For very large tables with millions of files, performing Delta checkpoints can become an expensive overhead during writes. Users can now disable this checkpointing by setting the hadoop configuration property io.delta.standalone.checkpointing.enabled
to false
. This is only safe and suggested to do if another job will periodically perform the checkpointing.
• Performance improvement to snapshot initialization. When a delta table is loaded at a particular version, the snapshot must contain, at a minimum, the latest protocol and metadata. This PR improves the snapshot load performance for repeated table changes.
• Support adding absolute paths to the Delta log. This now enables users to manually perform `SHALLOW CLONE`s and create Delta tables with external files.
• Fix in schema evolution to prevent adding non-nullable columns to existing Delta tables
• Dropped support for Scala 2.11. Due to lack to community demand and very low number of downloads, we have dropped Scala 2.11 support.
Liquid Partitioning
Liquid Clustering, a new effort to revamp how clustering works in Delta, which addresses the shortcomings of Hive-style partitioning and current ZORDER clustering. This feature will be available to preview soon; meanwhile, for more information, please refer to Liquid Clustering #1874.
Credits
Ahir Reddy, Ala Luszczak, Alex, Allen Reese, Allison Portis, Antoine Amend, Bart Samwel, Boyang Jerry Peng, CabbageCollector, Carmen Kwan, Christos Stavrakakis, Denny Lee, Desmond Cheong, Eric Ogren, Felipe Pessoto, Fred Liu, Fredrik Klauss, Gerhard Brueckl, Gopi Krishna Madabhushi, Grzegorz Kołakowski, Herivelton Andreassa, Jackie Zhang, Jiaheng Tang, Johan Lasperas, Junyong Lee, K.I. (Dennis) Jung, Kam Cheung Ting, Krzysztof Chmielewski, Lars Kroll, Lin Ma, Luca Menichetti, Lukas Rupprecht, Ming DAI, Mohamed Zait, Ole Sasse, Olivier Nouguier, Pablo Flores, Paddy Xu, Patrick Pichler, Paweł Kubit, Prakhar Jain, Ryan Johnson, Sabir Akhadov, Satya Valluri, Scott Sandre, Shixiong Zhu, Siying Dong, Son, Tathagata Das, Terry Kim, Tom van Bussel, Venki Korukanti, Wenchen Fan, Yann Byron, Yaohua Zhao, Yuhong Chen, Yuming Wang, Yuya Ebihara, aokolnychyi, gurunath, jintao shen, maryannxue, noelo, panbingkun, windpiger, wwang-talendCarly Akerly
07/24/2023, 9:44 PM