Ian
09/15/2023, 2:29 AMNo such file or directory: <s3a://bucket/delta-table/part-00000-57b118c1-a1a6-4ea1-aaa6-e2069601e1b6.c000.snappy.parquet>
It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running ‘REFRESH TABLE tableName’ command in SQL or by
recreating the Dataset/DataFrame involved.
to note we store our delta tables on MinIODominique Brezinski
09/15/2023, 3:15 AMIan
09/15/2023, 3:16 AMimport os
from pyspark.sql import SparkSession
delta_table_path = os.getenv("DELTA_TABLE_PATH")
s3endPointLoc = os.getenv("HOST")
s3accessKeyAws = os.getenv("AWS_S3_ACCESSKEY")
s3secretKeyAws = os.getenv("AWS_S3_SECRETKEY")
spark = SparkSession.builder \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0") \
.config("spark.driver.memory", "5g") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars",
'/app/jars/adal4j-1.6.4.jar,/app/jars/accessors-smart-1.2.jar,/app/jars/asm-5.0.4.jar,/app/jars/jsr305-1.3.9.jar,/app/jars/commons-lang3-3.4.jar,/app/jars/azure-annotations-1.10.0.jar,/app/jars/scala-java8-compat_2.12-0.9.0.jar,/app/jars/j2objc-annotations-1.1.jar,/app/jars/okhttp-3.12.6.jar,/app/jars/okio-1.15.0.jar,/app/jars/oauth2-oidc-sdk-6.5.jar,/app/jars/activation-1.1.jar,/app/jars/azure-eventhubs-spark_2.12-2.3.17.jar,/app/jars/azure-client-authentication-1.7.3.jar,/app/jars/jcip-annotations-1.0-1.jar,/app/jars/slf4j-api-1.7.28.jar,/app/jars/json-smart-2.3.jar,/app/jars/proton-j-0.33.4.jar,/app/jars/guava-24.1.1-jre.jar,/app/jars/adapter-rxjava-2.7.2.jar,/app/jars/azure-client-runtime-1.7.3.jar,/app/jars/javax.mail-1.6.1.jar,/app/jars/gson-2.8.0.jar,/app/jars/jackson-databind-2.10.1.jar,/app/jars/joda-time-2.9.9.jar,/app/jars/commons-codec-1.11.jar,/app/jars/15bd62bd_bc93_4ce1_ada0_1b7e9d7c9a8e-postgresql_42_3_5-6f056.jar,/app/jars/lang-tag-1.7.jar,/app/jars/checker-compat-qual-2.0.0.jar,/app/jars/rxjava-1.3.8.jar,/app/jars/qpid-proton-j-extensions-1.2.3.jar,/app/jars/error_prone_annotations-2.1.3.jar,/app/jars/jackson-datatype-joda-2.10.1.jar,/app/jars/jackson-annotations-2.10.1.jar,/app/jars/azure-eventhubs-3.2.0.jar,/app/jars/animal-sniffer-annotations-1.14.jar,/app/jars/jackson-core-2.10.1.jar,/app/jars/converter-jackson-2.7.2.jar,/app/jars/retrofit-2.7.2.jar,/app/jars/client-runtime-1.7.3.jar,/app/jars/okhttp-urlconnection-3.12.2.jar,/app/jars/logging-interceptor-3.12.2.jar,/app/jars/aws-java-sdk-1.11.901.jar,/app/jars/aws-java-sdk-bundle-1.11.874.jar,/app/jars/hadoop-aws-3.2.3.jar') \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.memory.fraction", "0.5") \
.config("spark.cleaner.periodicGC.interval", "10") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.fast.upload", "true") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
.config("spark.hadoop.fs.s3a.endpoint", s3endPointLoc) \
.config("spark.hadoop.fs.s3a.access.key", s3accessKeyAws) \
.config("spark.hadoop.fs.s3a.secret.key", s3secretKeyAws) \
.config("spark.hadoop.com.amazonaws.services.s3.enableV2", "true") \
.config('spark.hadoop.fs.s3a.committer.magic.enabled', 'true') \
.config('spark.hadoop.fs.s3a.committer.name', 'magic') \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").master(
"local").getOrCreate()
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
from delta.tables import *
import time
from datetime import datetime
try:
print("************************ OPTIMIZING ************************")
sql_statement = "OPTIMIZE '{}'".format(delta_table_path)
st_time = time.time()
print("Running command= {} time= {}".format(sql_statement, datetime.utcnow()))
spark.sql(sql_statement)
end_time = time.time()
time_taken = end_time - st_time
print("Completed running command= {} time= {}".format(sql_statement, datetime.utcnow()))
print("Total time taken= {} Hours".format(float(time_taken) / 3600))
print("Fetching optimization metrics")
deltaTable = DeltaTable.forPath(spark, delta_table_path)
history_df = deltaTable.history()
last_optimized_version = history_df.filter(history_df['operation'] == 'OPTIMIZE').select("version", "timestamp",
"operation",
"operationMetrics").orderBy(
history_df['version'].desc()).limit(3)
data = last_optimized_version.filter(last_optimized_version['operation'] == 'OPTIMIZE')[
["version", "timestamp", "operation", "operationMetrics"]].toJSON()
print(data.collect())
print("************************ VACUUMING ************************")
sql_statement = f"VACUUM delta.`{delta_table_path}` retain 0 hours "
st_time = time.time()
print("Running command= {} time= {}".format(sql_statement, datetime.utcnow()))
spark.sql(sql_statement)
end_time = time.time()
time_taken = end_time - st_time
print("Completed running command= {} time= {}".format(sql_statement, datetime.utcnow()))
print("Total time taken= {} Hours".format(float(time_taken) / 3600))
except Exception as e:
print(e)
history_df = deltaTable.history()
last_optimized_version = history_df.filter(history_df['operation'] == 'OPTIMIZE').select("version", "timestamp",
"operation",
"operationMetrics").orderBy(
history_df['version'].desc()).limit(3)
data = last_optimized_version.filter(last_optimized_version['operation'] == 'OPTIMIZE')[
["version", "timestamp", "operation", "operationMetrics"]].toJSON()
data.collect()
Antara
09/15/2023, 9:22 AMsql_statement = f"VACUUM delta.`{delta_table_path}` retain 0 hours "
This will remove all your files starting from now..
you can set more hoursIan
09/15/2023, 9:22 AMAntara
09/15/2023, 11:35 AMIan
09/15/2023, 11:39 AM