sabari dass
02/02/2023, 8:05 AMsabari dass
02/02/2023, 8:06 AMRoberto
02/02/2023, 2:58 PMVishal Kadam
02/03/2023, 6:58 PMVivek B
02/04/2023, 5:44 AMLennart Skogmo
02/05/2023, 9:33 AMRahul Sharma
02/05/2023, 1:39 PMSanjeeb Dey
02/06/2023, 5:41 AMSanjeeb Dey
02/06/2023, 5:43 AMSanjeeb Dey
02/06/2023, 5:43 AMAlex
02/06/2023, 9:08 AMIt is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
How it can be possible to have a file name cached in spark? We are getting a strange error regarding:
java.io.FileNotFoundException: No such file or directory: s3a://.....snappy.parquet
I restarted the app but still the same issue.
I checked delta logs and the file is not present in delta_log. How is spark able to remember this file when it is not even present in delta log?Alex
02/06/2023, 9:34 AMMorgan
02/06/2023, 10:59 AMsabari dass
02/06/2023, 11:07 AMÁlvaro José Baranoski
02/06/2023, 2:33 PMspark-submit
.
The table that I'm trying to create is the very first one in the getting started section, like so:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
Whoever, when executing the command, the log presents the following error:
[2023-02-06, 11:09:36 -03] {spark_submit.py:495} INFO - 23/02/06 11:09:36 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2) (172.19.0.3 executor 2): java.io.FileNotFoundException:
[2023-02-06, 11:09:36 -03] {spark_submit.py:495} INFO - File file:/home/alvaro/airflow/tmp/delta-table/_delta_log/00000000000000000000.json does not exist
[2023-02-06, 11:09:36 -03] {spark_submit.py:495} INFO -
[2023-02-06, 11:09:36 -03] {spark_submit.py:495} INFO - It is possible the underlying files have been updated. You can explicitly invalidate
[2023-02-06, 11:09:36 -03] {spark_submit.py:495} INFO - the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
[2023-02-06, 11:09:36 -03] {spark_submit.py:495} INFO - recreating the Dataset/DataFrame involved.
What should I do to get the delta table to be created on my local machine? Is it possible to do so using this kind of Spark cluster? If not, what is the best way to Spark + Delta Lake + Airflow on my local machine?
Thanks in advance!!Edmondo Porcu
02/06/2023, 6:22 PMKenny Ma
02/06/2023, 10:34 PM2023-02-06 14:28:14,286 INFO io.delta.standalone.internal.DeltaLogImpl [] - Loading version 34082 starting from checkpoint 34080.
2023-02-06 14:28:14,305 INFO io.delta.standalone.internal.SnapshotImpl [] - [tableId=6ee75af7-8e09-4d40-b92a-2cb10a844f20] Created snapshot io.delta.standalone.internal.SnapshotImpl@4202bfe8
2023-02-06 14:28:15,756 INFO org.apache.hadoop.fs.s3a.S3AInputStream [] - Switching to Random IO seek policy
2023-02-06 14:28:16,117 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - RecordReader initialized will read a total of 3094160 records.
2023-02-06 14:28:16,117 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - at row 0. reading next block
2023-02-06 14:28:24,143 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new decompressor [.snappy]
2023-02-06 14:28:24,210 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - block read in memory in 8093 ms. row count = 639939
2023-02-06 14:28:28,277 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - Assembled and processed 639939 records from 38 columns in 3861 ms: 165.74437 rec/ms, 6298.286 cell/ms
2023-02-06 14:28:28,278 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - time spent so far 67% reading (8093 ms) and 32% processing (3861 ms)
2023-02-06 14:28:28,278 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - at row 639939. reading next block
2023-02-06 14:28:36,215 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - block read in memory in 7935 ms. row count = 629616
2023-02-06 14:28:39,398 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - Assembled and processed 1269555 records from 38 columns in 7018 ms: 180.89983 rec/ms, 6874.1934 cell/ms
2023-02-06 14:28:39,399 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - time spent so far 69% reading (16028 ms) and 30% processing (7018 ms)
2023-02-06 14:28:39,400 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - at row 1269555. reading next block
2023-02-06 14:28:47,303 INFO shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader [] - block read in memory in 7902 ms. row count = 638410
Aaron Kub
02/07/2023, 2:27 AMdelta.deletedFileRetentionDuration
table property to "interval 21 days". When we run the vacuum command with a 504 hour retention period (21 days) we end up with corrupted snapshots that were created within the 21 day period. By "corrupted" I mean that when a version that is, say, 10 days old is read there is a FileNotFoundException
for certain part files that have now been deleted unexpectedly.
After digging deeper by running vacuum commands in dry-run mode I've concluded that the deleted files were (1) "removed" in a snapshot version greater than 7 days ago and (2) created over 21 days ago.
The ultimate culprit seems to be that the snapshot that Delta uses to generate the validFiles object in the VaccumCommand.gc
method is filtering out files that were removed more than 7 days ago because it's defaulting to the default retention period.
I've confirmed that the snapshot only has 7 days of removed files by doing something like this:
val deltaLog = DeltaLog.forTable(spark, "<s3://path-to-table|s3://path-to-table>")
val snapshot = deltaLog.update()
val removedFiles = snapshot.state
.filter(_.remove != null)
.collect()
.map(_.unwrap.asInstanceOf[RemoveFile])
.sortBy(_.delTimestamp)
Lastly, to double check that the default TOMBSTONE_RETENTION
value is in fact what is causing this behavior, I cloned the delta project and changed the default value to 21 days, then ran the above steps again. I then had 21 days of removed file history in the snapshot.
Sanity checks: I am completely sure that the retention period on the table is indeed set to 21 days by using the SHOW TBLPROPERTIES
command. The checkpoint files in the _delta_log
directory also contain 21 days of removed file history. It's just the in-memory snapshot that filters the removed files down to the most recent 7 days.
We are using AWS EMR, Scala 2.12, and Delta 1.0.0, although I have reproduced this behavior on delta 2.1.0.
It feels like I must be missing something. Has anyone encountered this?Grainne B
02/07/2023, 10:45 PMDhruvil Shah
02/08/2023, 1:15 AMDhruvil Shah
02/08/2023, 1:19 AMimport sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.createDataFrame([(1, "foo"), (2, "bar"), ],["id", "label"])
df.write.format("delta").mode("overwrite").save("<s3://lakeformation-dhruv/cdctest/>")
print( "Count ")
job.commit()
Dhruvil Shah
02/08/2023, 1:19 AMimport sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
a
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.load("<s3://lakeformation-dhruv/cdctest/>")
print ( df )
print( df.show() )
job.commit()
Dhruvil Shah
02/08/2023, 1:21 AMRahul Sharma
02/08/2023, 5:12 AM"pyspark.sql.utils.StreamingQueryException: The stream from your Delta table was expecting process data from version 142,\nbut the earliest available version in the _delta_log directory is 159. The files\nin the transaction log may have been deleted due to log cleanup. In order to avoid losing\ndata, we recommend that you restart your stream with a new checkpoint location and to\nincrease your delta.logRetentionDuration setting, if you have explicitly set it below 30\ndays.\nIf you would like to ignore the missed data and continue your stream from where it left\noff, you can set the .option(\"failOnDataLoss\", \"false\") as part\nof your readStream statement.\n=== Streaming Query ===\nIdentifier: JWR-SEC-
Aaron Kub
02/08/2023, 5:46 PMSekhar Sahu
02/08/2023, 9:58 PM23/02/08 21:53:36 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`delta_table` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
Traceback (most recent call last):
File "<stdin>", line 4, in <module>
File "/usr/lib/spark/python/pyspark/sql/session.py", line 1034, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 196, in deco
raise converted from None
pyspark.sql.utils.IllegalArgumentException: Can not create a Path from an empty string
Code:
## Create a DataFrame
data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")],
["id", "creation_date", "last_update_time"])
## Write a DataFrame as a Delta Lake dataset to the S3 location
spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string,
last_update_time string)
USING delta location
'<s3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table>'""");
data.writeTo("delta_table").append()
pyspark command used
pyspark --master yarn --deploy-mode client --repositories <http://repo.hortonworks.com/content/groups/public/,https://repos.spark-packages.org/,https://oss.sonatype.org/content/repositories/snapshots> --conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=5000 --conf spark.databricks.delta.optimize.maxFileSize=250000 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.driver.maxResultSize=0 --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.databricks.delta.optimize.repartition.enabled=true --conf spark.databricks.delta.autoOptimize=true --packages io.delta:delta-core_2.12:2.1.0
Leandro Rouberte
02/09/2023, 12:35 AMgopinath s
02/09/2023, 11:39 AMRoberto
02/09/2023, 12:06 PMNate Kuhl
02/09/2023, 5:13 PMmergeSchema=True
set on the DataFrameWriter and was surprised to see that the application wasn’t progressively merging in new columns. I looked up some documentation and noticed that for merges I need to set spark.databricks.delta.schema.autoMerge.enabled=true
to enable schema evolution.
I want to make sure I’m understanding the two configurations correctly:
• mergeSchema=true will enable schema evolution for write operations
• spark.databricks.delta.schema.autoMerge.enabled=true
enables schema evolution for merge operations (InsertAll and UpdateAll)