https://delta.io logo
i

Ian

09/15/2023, 2:29 AM
Hello Team, we are facing an issue on delta tables where we are running vaccum and optimize scripts but for some reason the code is throwing a filenotfound exception recently
Copy code
No such file or directory: <s3a://bucket/delta-table/part-00000-57b118c1-a1a6-4ea1-aaa6-e2069601e1b6.c000.snappy.parquet>
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved. to note we store our delta tables on MinIO
d

Dominique Brezinski

09/15/2023, 3:15 AM
Chances are you are vacuuming with too aggressive of retention parameters. Can you post the exact commands you run? And when are you hitting that exception? When generally trying to read the table? When streaming from the table?
i

Ian

09/15/2023, 3:16 AM
sure give me a moment
Copy code
import os

from pyspark.sql import SparkSession

delta_table_path = os.getenv("DELTA_TABLE_PATH")
s3endPointLoc = os.getenv("HOST")
s3accessKeyAws = os.getenv("AWS_S3_ACCESSKEY")
s3secretKeyAws = os.getenv("AWS_S3_SECRETKEY")

spark = SparkSession.builder \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0") \
    .config("spark.driver.memory", "5g") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.jars",
            '/app/jars/adal4j-1.6.4.jar,/app/jars/accessors-smart-1.2.jar,/app/jars/asm-5.0.4.jar,/app/jars/jsr305-1.3.9.jar,/app/jars/commons-lang3-3.4.jar,/app/jars/azure-annotations-1.10.0.jar,/app/jars/scala-java8-compat_2.12-0.9.0.jar,/app/jars/j2objc-annotations-1.1.jar,/app/jars/okhttp-3.12.6.jar,/app/jars/okio-1.15.0.jar,/app/jars/oauth2-oidc-sdk-6.5.jar,/app/jars/activation-1.1.jar,/app/jars/azure-eventhubs-spark_2.12-2.3.17.jar,/app/jars/azure-client-authentication-1.7.3.jar,/app/jars/jcip-annotations-1.0-1.jar,/app/jars/slf4j-api-1.7.28.jar,/app/jars/json-smart-2.3.jar,/app/jars/proton-j-0.33.4.jar,/app/jars/guava-24.1.1-jre.jar,/app/jars/adapter-rxjava-2.7.2.jar,/app/jars/azure-client-runtime-1.7.3.jar,/app/jars/javax.mail-1.6.1.jar,/app/jars/gson-2.8.0.jar,/app/jars/jackson-databind-2.10.1.jar,/app/jars/joda-time-2.9.9.jar,/app/jars/commons-codec-1.11.jar,/app/jars/15bd62bd_bc93_4ce1_ada0_1b7e9d7c9a8e-postgresql_42_3_5-6f056.jar,/app/jars/lang-tag-1.7.jar,/app/jars/checker-compat-qual-2.0.0.jar,/app/jars/rxjava-1.3.8.jar,/app/jars/qpid-proton-j-extensions-1.2.3.jar,/app/jars/error_prone_annotations-2.1.3.jar,/app/jars/jackson-datatype-joda-2.10.1.jar,/app/jars/jackson-annotations-2.10.1.jar,/app/jars/azure-eventhubs-3.2.0.jar,/app/jars/animal-sniffer-annotations-1.14.jar,/app/jars/jackson-core-2.10.1.jar,/app/jars/converter-jackson-2.7.2.jar,/app/jars/retrofit-2.7.2.jar,/app/jars/client-runtime-1.7.3.jar,/app/jars/okhttp-urlconnection-3.12.2.jar,/app/jars/logging-interceptor-3.12.2.jar,/app/jars/aws-java-sdk-1.11.901.jar,/app/jars/aws-java-sdk-bundle-1.11.874.jar,/app/jars/hadoop-aws-3.2.3.jar') \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.memory.fraction", "0.5") \
    .config("spark.cleaner.periodicGC.interval", "10") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .config("spark.hadoop.fs.s3a.endpoint", s3endPointLoc) \
    .config("spark.hadoop.fs.s3a.access.key", s3accessKeyAws) \
    .config("spark.hadoop.fs.s3a.secret.key", s3secretKeyAws) \
    .config("spark.hadoop.com.amazonaws.services.s3.enableV2", "true") \
    .config('spark.hadoop.fs.s3a.committer.magic.enabled', 'true') \
    .config('spark.hadoop.fs.s3a.committer.name', 'magic') \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").master(
    "local").getOrCreate()
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

from delta.tables import *
import time
from datetime import datetime

try:
    print("************************ OPTIMIZING ************************")
    sql_statement = "OPTIMIZE '{}'".format(delta_table_path)
    st_time = time.time()

    print("Running command= {} time= {}".format(sql_statement, datetime.utcnow()))
    spark.sql(sql_statement)

    end_time = time.time()
    time_taken = end_time - st_time
    print("Completed running command= {} time= {}".format(sql_statement, datetime.utcnow()))
    print("Total time taken= {} Hours".format(float(time_taken) / 3600))
    print("Fetching optimization metrics")

    deltaTable = DeltaTable.forPath(spark, delta_table_path)
    history_df = deltaTable.history()
    last_optimized_version = history_df.filter(history_df['operation'] == 'OPTIMIZE').select("version", "timestamp",
                                                                                             "operation",
                                                                                             "operationMetrics").orderBy(
        history_df['version'].desc()).limit(3)

    data = last_optimized_version.filter(last_optimized_version['operation'] == 'OPTIMIZE')[
        ["version", "timestamp", "operation", "operationMetrics"]].toJSON()
    print(data.collect())

    print("************************ VACUUMING ************************")
    sql_statement = f"VACUUM delta.`{delta_table_path}` retain 0 hours "
    st_time = time.time()

    print("Running command= {} time= {}".format(sql_statement, datetime.utcnow()))
    spark.sql(sql_statement)

    end_time = time.time()
    time_taken = end_time - st_time
    print("Completed running command= {} time= {}".format(sql_statement, datetime.utcnow()))
    print("Total time taken= {} Hours".format(float(time_taken) / 3600))
except Exception as e:
    print(e)

history_df = deltaTable.history()
last_optimized_version = history_df.filter(history_df['operation'] == 'OPTIMIZE').select("version", "timestamp",
                                                                                         "operation",
                                                                                         "operationMetrics").orderBy(
    history_df['version'].desc()).limit(3)

data = last_optimized_version.filter(last_optimized_version['operation'] == 'OPTIMIZE')[
    ["version", "timestamp", "operation", "operationMetrics"]].toJSON()
data.collect()
Here is the code we use. If it matters we use APACHE Spark 3.3.2 under the hood
We have cron jobs that periodically runs this script every night we have started facing this issue just recently. We do not manually interfere with these scripts or the delta table as they are all automated with nifi for ingestion and we use spark to read tables for analytics
a

Antara

09/15/2023, 9:22 AM
Copy code
sql_statement = f"VACUUM delta.`{delta_table_path}` retain 0 hours "
This will remove all your files starting from now.. you can set more hours
i

Ian

09/15/2023, 9:22 AM
can you elaborate or point to a documentation please that would be helpful thanks
a

Antara

09/15/2023, 11:35 AM
https://docs.delta.io/latest/delta-utility.html here in the documentation you will find for vacuum
i

Ian

09/15/2023, 11:39 AM
thanks