Szymon Sikora
06/28/2023, 12:07 PMkv = {c: f"source.{c}" for c in df2.columns}
target_df.alias("target").merge(
df2.alias("source"), "source.Name = target.Name"
).whenNotMatchedInsert(
values={
"uuid": "uuid()",
**kv
}
).whenNotMatchedBySourceDelete().execute()
Is there any way to do that or such an action is not supported?Martin
06/28/2023, 2:05 PMuuid()
function: https://spark.apache.org/docs/latest/api/sql/index.html#uuid
I think it is not (yet) exposted to the PySpark API.
You could work around this by using PySpark's expr()
function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html#pyspark.sql.functions.expr
from pyspark.sql import functions as f
kv = {c: f"source.{c}" for c in df2.columns}
target_df.alias("target").merge(
df2.alias("source"), "source.Name = target.Name"
).whenNotMatchedInsert(
values={
"uuid": f.expr("uuid()"),
**kv
}
).whenNotMatchedBySourceDelete().execute()
Szymon Sikora
06/28/2023, 2:54 PMAnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window
Martin
06/28/2023, 5:31 PMdf2
upfront (before the merge)?
from pyspark.sql import functions as f
df2 = df2.withColumn("uuid", f.expr("uuid()"))
kv = {c: f"source.{c}" for c in df2.columns}
target_df.alias("target").merge(
df2.alias("source"), "source.Name = target.Name"
).whenNotMatchedInsert(
values={
**kv
}
).whenNotMatchedBySourceDelete().execute()
Szymon Sikora
06/29/2023, 7:06 AM