Rahul Sharma
02/15/2023, 4:05 PMspark_session_micro_batch.sql(f"""
SELECT *
FROM (
select *
,row_number() over(partition by {<http://self.pk|self.pk>} order by __source_ts_ms desc ) as dedupe
from {self.micro_batch_view}
)
WHERE
dedupe =1
""").drop("dedupe").createOrReplaceTempView(self.micro_batch_dedup_view)
and this is our merge query
SqlQuery = """
MERGE INTO {db_name}.{refine_table_name} v
USING dedup_temp_view u
ON v.{pk}=u.{pk}
WHEN MATCHED AND u.__op = "u"
THEN UPDATE SET *
WHEN MATCHED AND u.__op = "d"
THEN DELETE
WHEN NOT MATCHED AND (u.__op = "c" or u.__op = "r")
THEN INSERT *
""".format(refine_table_name=refine_table_name,db_name=db_name,pk=pk)
Nick Karpov
02/15/2023, 4:12 PMRahul Sharma
02/15/2023, 4:13 PMNick Karpov
02/15/2023, 4:24 PMpk
2. apply the appropriate logic within each grouping to squash, you still need some other column (i'm guessing _source_ts_ms
) to facilitate ordering within a given group... then apply a foldLeft type operation where you accumulate/rewrite values as you traverse each row within the group depending on whether it's an INSERT/UPDATE/DELETE... you may need to go as far as a UDAF, I'm not sure if SQL is expressive enough to do all of this