Ahmad Dorri
06/19/2023, 6:42 AMSukumar Nataraj
06/19/2023, 9:13 AMrg.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$9(JsonDataSource.scala:144) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (byte[])"{"add":{"path":"site_id=76492/part-00190-15100589-5cb2-42a9-ba1d-46ca7f6fc84c.c000.snappy.parquet","partitionValues":{"site_id":"76492"},"size":26061,"modificationTime":1687106147000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":10375265,\"business_entity_id\":1,\"handle\":\"SOME_VALUE\",\"initial_plan_handle\":\"free\",\"current_plan_handle\":\"free\",\"currency_code\":\"USD\",\"created_at\":\"2018-05-18T15:29:17.000Z\",\"activated_at\":\"2018-05-18T15:29:17.000Z\",\"[truncated 1733 bytes]; line: 1, column: 8])
at [Source: (byte[])"{"add":{"path":"site_id=76492/part-00190-15100589-5cb2-42a9-ba1d-46ca7f6fc84c.c000.snappy.parquet","partitionValues":{"site_id":"76492"},"size":26061,"modificationTime":1687106147000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":10375265,\"business_entity_id\":1,\"handle\":\"SOME_VALUE\",\"initial_plan_handle\":\"free\",\"current_plan_handle\":\"free\",\"currency_code\":\"USD\",\"created_at\":\"2018-05-18T15:29:17.000Z\",\"activated_at\":\"2018-05-18T15:29:17.000Z\",\"[truncated 1733 bytes]; line: 1, column: 2234]
at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:513) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$7(JsonDataSource.scala:140) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
... 22 more
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (byte[])"{"add":{"path":"site_id=76492/part-00190-15100589-5cb2-42a9-ba1d-46ca7f6fc84c.c000.snappy.parquet","partitionValues":{"site_id":"76492"},"size":26061,"modificationTime":1687106147000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":10375265,\"business_entity_id\":1,\"handle\":\"SOME_VALUE\",\"initial_plan_handle\":\"free\",\"current_plan_handle\":\"free\",\"currency_code\":\"USD\",\"created_at\":\"2018-05-18T15:29:17.000Z\",\"activated_at\":\"2018-05-18T15:29:17.000Z\",\"[truncated 1733 bytes]; line: 1, column: 8])
at [Source: (byte[])"{"add":{"path":"site_id=76492/part-00190-15100589-5cb2-42a9-ba1d-46ca7f6fc84c.c000.snappy.parquet","partitionValues":{"site_id":"76492"},"size":26061,"modificationTime":1687106147000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":10375265,\"business_entity_id\":1,\"handle\":\"SOME_VALUE\",\"initial_plan_handle\":\"free\",\"current_plan_handle\":\"free\",\"currency_code\":\"USD\",\"created_at\":\"2018-05-18T15:29:17.000Z\",\"activated_at\":\"2018-05-18T15:29:17.000Z\",\"[truncated 1733 bytes]; line: 1, column: 2234]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:682) ~[jackson-core-2.13.4.jar:2.13.4]
at com.fasterxml.jackson.core.base.ParserBase._handleEOF(ParserBase.java:494) ~[jackson-core-2.13.4.jar:2.13.4]
at com.fasterxml.jackson.core.base.ParserBase._eofAsNextChar(ParserBase.java:511) ~[jackson-core-2.13.4.jar:2.13.4]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:3039) ~[jackson-core-2.13.4.jar:2.13.4]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:756) ~[jackson-core-2.13.4.jar:2.13.4]
at org.apache.spark.sql.catalyst.json.JacksonUtils$.nextUntil(JacksonUtils.scala:30) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at <http://org.apache.spark.sql.catalyst.json.JacksonParser.org|org.apache.spark.sql.catalyst.json.JacksonParser.org>$apache$spark$sql$catalyst$json$JacksonParser$$convertObject(JacksonParser.scala:424) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:102) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:101) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:377) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeStructRootConverter$3(JacksonParser.scala:101) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$parse$2(JacksonParser.scala:501) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2860) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:496) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$7(JsonDataSource.scala:140) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
Samir Vyas
06/19/2023, 9:17 AMval table = DeltaTable.forPath(spark, <path>)
// table schema could be like this
// id: int, ...some other columns, p1: string, p2: string
// here p1 and p2 are partition columns with order p1, p2
val df = <data frame with ids to delete, could be around 10k ids>
table
.merge(
broadcast(df),
col(p1).isin(<some p1 values from which I want to delete ids>) && col("id") === df("id")
)
.whenMatched()
.delete()
.execute()
Now I want to preserve the partitioning structure after the above operation so I set spark.databricks.delta.merge.repartitionBeforeWrite.enabled=true.
When I run the above, I notice a significant amount of shuffle. Apparently, spark reads data from all partitions of p1 which match the passed values, then removes the matching rows using a broadcast join and then performs a repartition->partitionBy before writing to preserve partitioning structure (p1->p2). I feel that we can avoid the shuffle here because merge operation is not changing the partitioning structure. Ie. spark can go to each partition, replace the older file with new file from which ids are deleted and create entries for these new files into delta log.
Is there any method or config that I can use to avoid the shuffle in the above operation? I currently use delta lake 0.8.0. Thanks in advance!sabari dass
06/19/2023, 6:58 PMshareef shaik
06/20/2023, 7:56 AMshareef shaik
06/20/2023, 7:56 AMshareef shaik
06/20/2023, 7:57 AMshareef shaik
06/20/2023, 7:58 AMcprieto
06/20/2023, 7:59 AMversion
parameter in open_table_with_version
, but my question is a little around this, How can I get the changes done in that version? for example, just the inserted rows in that version instead of all the tuples up to that version?Ujjwal Kumar Gupta
06/20/2023, 10:58 AMvinu thomas
06/20/2023, 12:14 PMMatt Moen
06/20/2023, 5:03 PMMatt Moen
06/20/2023, 10:24 PMspark.conf.set("spark.databricks.delta.optimize.maxFileSize", 256*1024*1024)
Is there an equivalent setting for the target file size when writing an initial dataset, for example from df.write
?Heet Bhimani
06/21/2023, 6:18 AMSai Allu
06/21/2023, 9:01 PMHeet Bhimani
06/22/2023, 9:52 AMLaurens Janssen
06/22/2023, 11:22 AMSadiq Kavungal
06/22/2023, 11:33 AMRudhra Raveendran
06/22/2023, 10:55 PMcol-04ee4877-ee53-4cb9-b1fb-1a4eb74b508c
. The underlying parquet files still have the right column names (which I think is the whole point of the column mapping right?), but is there a way to force delta table reads to use the logical/parquet file reads?bharat chaudhury
06/23/2023, 11:13 AMspark = (
SparkSession \
.builder.appName("DeltaLakeFundamentals") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
)
spark = configure_spark_with_delta_pip(spark).getOrCreate()
spark.sql("create table default.np_delta_table (id int , name string) using delta")
I am getting the below error can some one please help .
Traceback (most recent call last):
File "/home/bchaudhu/delta/delta-create.py", line 13, in <module>
spark.sql("create table default.np_delta_table (id int , name string) using delta")
File "/opt/cloudera/parcels/SPARK3-3.3.0.3.3.7180.0-274-1.p0.31212967/lib/spark3/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
File "/opt/cloudera/parcels/SPARK3-3.3.0.3.3.7180.0-274-1.p0.31212967/lib/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/opt/cloudera/parcels/SPARK3-3.3.0.3.3.7180.0-274-1.p0.31212967/lib/spark3/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
File "/opt/cloudera/parcels/SPARK3-3.3.0.3.3.7180.0-274-1.p0.31212967/lib/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o72.sql.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.catalog.CatalogTable.<init>(Lorg/apache/spark/sql/catalyst/TableIdentifier;Lorg/apache/spark/sql/catalyst/catalog/CatalogTableType;Lorg/apache/spark/sql/catalyst/catalog/CatalogStorageFormat;Lorg/apache/spark/sql/types/StructType;Lscala/Option;Lscala/collection/Seq;Lscala/Option;Ljava/lang/String;JJLjava/lang/String;Lscala/collection/immutable/Map;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/collection/Seq;ZZLscala/collection/immutable/Map;Lscala/Option;)V
at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createDeltaTable$1(DeltaCatalog.scala:134)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:57)
at <http://org.apache.spark.sql.delta.catalog.DeltaCatalog.org|org.apache.spark.sql.delta.catalog.DeltaCatalog.org>$apache$spark$sql$delta$catalog$DeltaCatalog$$createDeltaTable(DeltaCatalog.scala:86)
at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createTable$1(DeltaCatalog.scala:271)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:57)
at org.apache.spark.sql.delta.catalog.DeltaCatalog.createTable(DeltaCatalog.scala:263)
at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:45)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:99)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at <http://org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org|org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org>$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
Where I am going wrong ? Can some one please guide .Heitor Augusto
06/23/2023, 1:15 PMbharat chaudhury
06/23/2023, 2:15 PMSai Allu
06/23/2023, 5:25 PMDadepo Aderemi
06/24/2023, 4:52 AMAbolfazl karimian
06/24/2023, 10:03 AMMatt Moen
06/24/2023, 4:40 PMNoah Prince
06/25/2023, 4:40 AMdelta-rs
to automatically write checkpoint files? I’ve got around 300 json files in my _delta_log but no checkpoints. Is it something you have to do manually?
Code here for context https://github.com/helium/helium-data/blob/main/protobuf-delta-lake-sink/src/main.rs#L304Renganathan Mutthaiah
06/25/2023, 10:00 AMNoah Prince
06/25/2023, 10:59 PMError: Failed to convert into Arrow schema: Json error: Binary is not supported by JSON
Any idea how to fix?guru moorthy
06/26/2023, 8:40 AM