https://delta.io logo
m

Martin

07/07/2023, 4:28 PM
PySpark
df.cache()
produces dirty reads on Delta
I noticed some (to me) strange behavior and I like to aks if this is by-design or a bug:
Copy code
location = 'some arbitrary location'

# create delta table at location containing *10* rows
spark.range(10).toDF("id").write.format("delta").mode("overwrite").option("path", location).save()

# reading from delta table and caching the dataframe
df_init = spark.read.format('delta').load(location)
df_init.cache()
df_init.count()
>>> 10

# overwriting the delta table with *100* rows
spark.range(100).toDF("id").write.format("delta").mode("overwrite").option("path", location).save()

# now checking the number of rows again
spark.read.format('delta').load(location).count()
>>> 10
Is this by design? Is this because Spark "remembers" that the result of execution plan for
spark.read.format('delta').load(location)
has been cached and uses it instead of reexecuting the plan - and therefore does not notice that the underlying delta table has changed?
d

Dominique Brezinski

07/07/2023, 4:42 PM
Correct, it is because
.cache()
is insidious and should rarely be used--it keys the cache by plan and value is the cached results. This means anybody on the same cluster that invokes the same query plan will receive the cached results, likely unaware that is what they are getting. The cache lookup happens before the source is evaluated in any other way. Most users are not aware of this, and more often that not it causes more problems than it solves. Dataframe caching is working as designed, the behavior is not specific to Delta, and this is a brilliant example why many of us would prefer our users never use
.cache/persist
. I have heard Michael Armbrust say his goal is that users never have to think about caching--that the system just performs optimally without it. There are a few cases where it is still warranted, but only experts should be making that decision.
👍 3
m

Martin

07/07/2023, 4:50 PM
Thanks a lot @Dominique Brezinski
j

JosephK (exDatabricks)

07/07/2023, 7:07 PM
Cache was basically a mistake made in the RDD API. I usually say you should only use cache if your data is static, your isolated on a cluster, you know the data fits in memory, and you're going to use the data 3-4 times. Even then, it can be incorrect to use it
👍 1
👆 1
j

Jacek

07/22/2023, 7:33 PM
“you know the data fits in memory” is a slight exaggeration - why does `cache`require “data fits in memory”? Disk is the secondary storage when system runs out of memory (RAM), isn’t it?
This “your isolated on a cluster” also concerns me - isolation is based on
SparkContext
= RDDs are tracked by ID and no other user can access blocks of your own SC. I must be missing something obvious.
d

Dominique Brezinski

07/22/2023, 7:44 PM
The caching of the query plan to blocks isn’t context isolated I believe. It is global on the driver, so other users on the same cluster will get the cached results if they execute the same plan. It is insidious.
I could be wrong, but we have definitely seen cases where one user of cluster cached and other users unexpectedly got stale data from the same queries later in time.
j

JosephK (exDatabricks)

07/24/2023, 11:14 AM
Cache should fit in memory because otherwise it either spills, which is bad or doesn't cache at all. I can't recall which one happens but since both are bad. Isolated on cluster is important bc someone else on the cluster can clearCache() or just use ram and evict the cache.
j

Jacek

07/25/2023, 11:00 AM
Oh, c’mon. We might be talking about different things here yet call ’em ‘caching’. Caching is
SparkContext
-bound so unless it’s shared there’s an isolated temporary directory for data blocks. I’ll have to refresh my memory on this and be back with more concrete examples next time asked. /me Back to studying MERGE INTO 😉