Anmol Rastogi
05/05/2023, 7:55 AM"""
full_data_load.py
~~~~~~~~~~~
This Python module is used to load data from RDS to data lake storage (i.e. S3 with respect to AWS)
"""
import os
import deltalake
env = 'dev'
region = 'ap-south-1'
os.environ["ENV"] = env
os.environ["REGION"] = region
bucket = 'analytics-pipeline-ap-south-1'
def main():
data_read = deltalake.table.DeltaTable(f's3://{bucket}/DeltaLake_convert/Anmol', storage_options={'region': region})
print(data_read)
# entry point for ELT application
if __name__ == '__main__':
main()
while doing this i am getting following error:
deltalake.PyDeltaTableError: Not a Delta table: No snapshot or version 0 found, perhaps s3://{bucket}/DeltaLake_convert/Anmol is an empty dir?
while if you see below image the deltalog directory and parquet exists there.
am i doing something wrong?
Note: I am not looking for something with spark. 🙂Rahul Sharma
05/05/2023, 8:06 AMJoydeep Banik Roy
05/05/2023, 9:59 AMJoydeep Banik Roy
05/05/2023, 10:00 AMsharath
05/05/2023, 11:10 AMAbidi Gassen
05/05/2023, 6:05 PMkamal kaur
05/05/2023, 7:48 PMid
with around 500k unique values. We are using merge operation in that table using id
and we will be querying using Trino by that id
. I know those are too many partitions but if the use case is just to query/merge that table by id
, will this work?Rambabu Posa
05/05/2023, 8:12 PMparquet
file to delta
using the below code snippet
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setAppName("Parquet to Delta Converter");
conf.setMaster("local[*]");
SparkSession session = SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config(conf)
.getOrCreate();
DeltaTable.convertToDelta(session, "parquet.`table/betting`");
session.stop();
}
All my parquet files are available at table/betting
. but getting this error:
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$.$lessinit$greater$default$4()Z
at org.apache.spark.sql.delta.commands.ParquetTable.$anonfun$mergeSchemasInParallel$2(ConvertToDeltaCommand.scala:595)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Many thanksUjjwal Kumar Gupta
05/07/2023, 2:31 AMYuya Ebihara
05/08/2023, 9:45 AMstorageType
in deletion vectors. When I tried this feature in Databricks, it always return u
. When do they use other types (i
& p
)?
cc: @Marius Grama @Vikash Kumar @Slawomir PajakSlackbot
05/08/2023, 11:33 AMLucas Zago
05/08/2023, 3:03 PMmergeSchema
and overwriteSchema
? When to use one instead other?Shreyas Venkat
05/08/2023, 3:55 PMKashyap Bhatt
05/08/2023, 4:19 PMstreaming_oracle_df = spark.readStream \
.format("custom_oracle") \ # <- my custom format
.option("oracle_jdbc_str", "jdbc:...") \
.option("custom_option1", 123) \
.load()
streaming_oracle_df.writeStream \
.trigger(availableNow=True) \
.format('delta') \
.option('checkpointLocation', '<s3://bucket/checkpoint/dim_customer>') \
.start('<s3://bucket/tables/dim_customer>')
Jordan Cuevas
05/09/2023, 2:49 PMvijay
05/09/2023, 8:29 PMArtsiom Yudovin
05/09/2023, 9:04 PMSumanth Bo3
05/10/2023, 6:35 AMdf = spark.read.format("delta").load("/Users/saiteja/Downloads/new/")
df.createOrReplaceTempView("my_city_view")
result = spark.sql("SELECT LEFT(name, 1) AS letter, COUNT(*) AS num_cities FROM my_city_view GROUP BY letter ORDER BY letter ASC")
— is there any better efficient and faster way to query delta tables
— Currently we are reading the entire delta table into df and creating a temporary view on top of it
• Is the df and the tempView stored in memory during this phase
• if the number of rows in the delta table is very high wouldn’t it cause high memory usage to read the table and create a view on top of it
another thing which i observed is
SELECT LEFT(name, 1) AS letter, COUNT(*) AS num_cities FROM my_city_view GROUP BY letter ORDER BY letter ASC
my first query is relatively taking very high time to execute where are the subsequent queries on the same my_city_view
are getting executed very fast
here is an example
No.of Records :150710
Time taken to run query 1: 135ms
Time taken to run query 2: 13ms
Time taken to run query 3: 15ms
is it caching the records in memory or something
• i tried the same for different datasets but every time only the first query will take up more time where as queries executed after that will be 4 or 5 times faster
Another question not related to delta lake
— the delta lake query results in the above code is stored in result
which is a spark dataframe when i try result.show()
it is taking a good 40-50ms is the a better faster way to export the results mainlyRahul Sharma
05/10/2023, 7:32 AMAmit Panwar
05/10/2023, 2:26 PMJohn Darrington
05/10/2023, 9:12 PMPatrik Ekman
05/11/2023, 7:20 AMAlfonso
05/11/2023, 9:34 AMAlfonso
05/11/2023, 9:35 AMRudhra Raveendran
05/11/2023, 6:43 PMSukumar Nataraj
05/12/2023, 5:04 AM23/05/12 05:01:16 INFO DataSourceStrategy: Pruning directories with: isnotnull(site_id#36183L),(site_id#36183L = 1007443),dynamicpruning#142232 [site_id#36183L]
23/05/12 05:01:16 INFO FileSourceStrategy: Pushed Filters: IsNull(deleted),IsNotNull(id)
23/05/12 05:01:16 INFO FileSourceStrategy: Post-Scan Filters: isnull(deleted#36235),isnotnull(id#36182L)
23/05/12 05:01:16 INFO FileSourceStrategy: Output Data Schema: struct<id: bigint, code: string, deleted: string ... 1 more fields>
23/05/12 05:01:16 INFO DataSourceStrategy: Pruning directories with: isnotnull(site_id#36173L),dynamicpruning#142231 [site_id#36173L]
23/05/12 05:01:16 INFO FileSourceStrategy: Pushed Filters: IsNotNull(id)
23/05/12 05:01:16 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(id#36172L)
23/05/12 05:01:16 INFO FileSourceStrategy: Output Data Schema: struct<id: bigint>
Rahul Sharma
05/12/2023, 6:04 AMAmit Panwar
05/12/2023, 12:07 PMAmit Panwar
05/12/2023, 12:07 PMAmit Panwar
05/12/2023, 12:07 PMbuilder = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
data = spark.range(0, 5)
deltaTable = spark.read.format("delta").load("/tmep/delta-table2")
deltaTable.show()