https://delta.io logo
#random
Title
z

Zohaa Qamar

01/29/2023, 3:57 AM
Hi fellows, my question is not about delta lake but generally for Spark on EMR - sorry about that. So, if it does not fit here, please feel free to delete the question. I have a Spark job which I am running on EMR. This job has to process some 2 TB of data and I am getting error:
Copy code
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 27571"...
I have tried using various instance types like 20-25 core instances of m5.16xlarge and r5.12xlarge. Also, tried playing around spark configurations like spark.driver.memory and spark.executor.memory from 30g to 300g, but nothing helped. The job has not any major computation but simply its spark.coalesce.write.partitionBy.parquet. Also tried setting _HADOOP_HEAPSIZE_ in the configuration to 100g and 200g. Please ask if more information is required. Thanks. Here is the screenshot of EMR executors:
j

JosephK (exDatabricks)

01/29/2023, 10:47 AM
Coalesce is trying to reduce all your data to fewer memory partitions and these are too big to fit into memory and giving you oom. You should use repartition before a write operation. You can also just use delta lake to do optimization/compaction
z

Zohaa Qamar

01/29/2023, 11:04 PM
i will try to increase the number of partitions in coalesce to maybe 100 as currently they are set to 20.
@JosephK (exDatabricks), I replaced coalesce with repartition and also increased the number of partitions from 20 to 1000. Below are my other configurations:
Copy code
spark.driver.memory=100g
spark.executor.memory=100g
spark.yarn.executor.memoryOverhead=2g
spark.yarn.driver.memoryOverhead=2g
spark.executor.cores=5
spark.driver.cores=5
spark.executor.instances=60
spark.network.timeout=50000s
spark.executor.heartbeatInterval=5000s
But still it is failing in EMR. I have the below errors now:
Copy code
ERROR AsyncEventQueue: Dropping event from queue eventLog. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
and also this in stderr:
Copy code
java.lang.NullPointerException: null value for spark.executor.cores
I have tried with several variations in the spark configurations and coalesce/repartition etc. But there is no way it is succeeding. Any help please?
m

Mike M

01/30/2023, 3:49 AM
What is the source data format and how are you reading it? If CSV, are you defining the schema manually?
z

Zohaa Qamar

01/30/2023, 3:59 AM
source is json with gzip compression..
Copy code
sparkSession.read().schema(getInputSchema()).json(sourceFilesPath)
m

Mike M

01/30/2023, 4:00 AM
How many files are you reading from?
z

Zohaa Qamar

01/30/2023, 4:01 AM
they are lots.. like really too many.. the size is around 2TB..
m

Mike M

01/30/2023, 4:03 AM
Are they similarly sized?
z

Zohaa Qamar

01/30/2023, 4:03 AM
yeah more or less equi-sized files
m

Mike M

01/30/2023, 4:04 AM
How large compressed / uncompressed? JSON can be a pain in the rear to parse in large sizes, and it wouldn't surprise me if that's a part of the issue. Also, do you know if it's pure JSON or more of a JSON-lines style?
z

Zohaa Qamar

01/30/2023, 4:06 AM
pure JSON.. file size is 2.0 to 2.3 MB each.. previously I had around 1.5TB of data and that worked fine with the same code.. this is sudden increase in data for one day and it is failing even after several configuration tweaks.
m

Mike M

01/30/2023, 4:12 AM
Ah - one of those deals. Usually to work around them, I end up splitting steps / writing out intermediate steps to parquet / bumping node size / etc.
z

Zohaa Qamar

01/30/2023, 4:13 AM
i tried bumping node size and instance count.. from m5.12xlarge to m5.16xlarge.. and also as suggested in the blog above, r5.12xlarge.. 20-25 instances,, but no luck at all
m

Mike M

01/30/2023, 4:17 AM
What is your
spark.sql.shuffle.partitions
value?
z

Zohaa Qamar

01/30/2023, 4:24 AM
this would be default as not setting this explicitly
any suggestions to what value do I set it?
m

Mike M

01/30/2023, 4:30 AM
I believe the default is 200 - I've almost never had it make an effect, but you can try setting to 400 or 800. I'm not sure if it would make a difference here.
z

Zohaa Qamar

01/30/2023, 4:33 AM
right.. any other suggestions.. seems so nothing is working here though!
m

Mike M

01/30/2023, 4:37 AM
Other than the partitions thing, which in theory could fit (I believe that setting is how many partitions it sets up by default to process into - usually I don't hit it on read due to larger / fewer files, but it might make a difference in your case) - I'd try looking at the SparkUI and see if anything seems particularly skewed. Beyond that, possibly try a parquet conversion on a subset of the files, then merge the two after.
✔️ 1
z

Zohaa Qamar

01/30/2023, 4:40 AM
Yeah, partitions I have increased and also used repartition instead of coalesce..
I am not much knowledgeable about Spark UI and identifying the issues using that. if you would be happy to spare some time for a screen sharing to help me out tomorrow, shall be really thankful!
j

JosephK (exDatabricks)

01/30/2023, 12:25 PM
Def a case of too many files. What appears to be happening is that you have hundreds of thousands of files and it’s causing scheduling problems. As mentioned above, breaking it up in to smaller jobs may help. What is happening is that the drive is trying to get the list of files to read and it’s so massive that it can’t hold the list in memory. I also hate to shill for my own company, but many of these kinds of problems don’t exist on databricks. It may cost more, but saves you money in the end.
👍 1
z

Zohaa Qamar

01/30/2023, 3:38 PM
Copy code
breaking it up in to smaller jobs may help
does this mean I process my job for chunks of data multiple times?
j

JosephK (exDatabricks)

01/30/2023, 3:56 PM
yeah, read in like 20gb at a time
z

Zohaa Qamar

01/30/2023, 5:35 PM
sorry, but can you guide how this can be done? read by restricting the data? Thanks!
j

JosephK (exDatabricks)

01/30/2023, 5:42 PM
It depends on how your data is laid out. If there are any partitions that will make it easier. You can also do streaming of 1 file per trigger
z

Zohaa Qamar

01/30/2023, 5:43 PM
its year/month/day/hour.. and my process is hourly.. so its already reading from the last partition i.e. hour
j

JosephK (exDatabricks)

01/30/2023, 5:45 PM
yeah, you can do it in parts maybe day at a time.
z

Zohaa Qamar

01/30/2023, 5:47 PM
its already on hour
5 Views