https://delta.io logo
r

Rahul Sharma

02/15/2023, 4:05 PM
Hello Team ,we Found a Bug in our production we have a CDC platform where we are performing UPSERT into refine table using rank function but let’s say if records come with same time insert and then update then my rank function will only take updated item and perform upsert but this records doesn’t match to refine table bcz this records hasn’t be inserted so can anyone guide us how to create a plan to first insert the record and then update it
Copy code
spark_session_micro_batch.sql(f"""
				SELECT * 
				FROM (
					select *
					,row_number() over(partition by {<http://self.pk|self.pk>} order by  __source_ts_ms desc ) as dedupe
					from {self.micro_batch_view}
					)
				WHERE 
					dedupe =1 
			""").drop("dedupe").createOrReplaceTempView(self.micro_batch_dedup_view)
				
and this is our merge query

			SqlQuery = """
				MERGE INTO {db_name}.{refine_table_name} v
				USING dedup_temp_view u
				ON v.{pk}=u.{pk}
				WHEN MATCHED AND u.__op = "u"
				THEN UPDATE SET *
				WHEN MATCHED AND u.__op = "d"
				THEN DELETE
				WHEN NOT MATCHED AND (u.__op = "c" or u.__op = "r")
				THEN INSERT *
			""".format(refine_table_name=refine_table_name,db_name=db_name,pk=pk)
n

Nick Karpov

02/15/2023, 4:12 PM
you need to preprocess your incoming batch to handle this case... if it contains both the original INSERT and an UPDATE, you should add logic that correctly squashes those two events into a single INSERT with the correct values (presumably from the UPDATE)
r

Rahul Sharma

02/15/2023, 4:13 PM
is there any documentation or hint would be better
n

Nick Karpov

02/15/2023, 4:24 PM
i'm not aware of anything, but making a note to write about it on the delta.io blog... in general you should try 1. group by your
pk
2. apply the appropriate logic within each grouping to squash, you still need some other column (i'm guessing
_source_ts_ms
) to facilitate ordering within a given group... then apply a foldLeft type operation where you accumulate/rewrite values as you traverse each row within the group depending on whether it's an INSERT/UPDATE/DELETE... you may need to go as far as a UDAF, I'm not sure if SQL is expressive enough to do all of this
2 Views