Ian
04/06/2023, 1:13 PM{
"message": "Error while creating the delta table=<s3a://test/15f2a198-6302-410f-afc7-6f6aa5dd0f27/description_field_table/> Error=Failed to load checkpoint: Failed to read checkpoint content: Generic S3 error: response error \"<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01//EN\"\"<http://www.w3.org/TR/html4/strict.dtd>\">\r\n<HTML><HEAD><TITLE>Length Required</TITLE>\r\n<META HTTP-EQUIV=\"Content-Type\" Content=\"text/html; charset=us-ascii\"></HEAD>\r\n<BODY><h2>Length Required</h2>\r\n<hr><p>HTTP Error 411. The request must be chunked or have a content length.</p>\r\n</BODY></HTML>\r\n\", after 0 retries: HTTP status client error (411 Length Required) for url (<http://169.254.169.254/latest/api/token>)."
}
Jim Hibbard
04/06/2023, 4:07 PMIan
04/06/2023, 4:38 PMdelta_table_schema, partition_cols = self.generate_delta_table_schema(schema)
df = spark_session.createDataFrame([], delta_table_schema)
df.write.format("delta").mode(saveMode='error').option("mergeSchema", "true").partitionBy(
partition_cols).save(delta_path)
def generate_delta_table_schema(schema: dict):
"""
`generate_delta_table_schema` method helps to create delta compatible schema using pyspark
:param: schema: dict schema
"""
partition_columns = []
schema_items = []
data_types = {'string': StringType(), 'integer': IntegerType(), 'float': FloatType(), 'double': DoubleType(),
'timestamp': TimestampType(), 'boolean': BooleanType()}
for column_name, column_value in schema.get('schema').items():
data_type = column_value.get('data_type')
is_partition = column_value.get('partition_column', False)
is_nullable = str2bool(column_value.get('nullable', True))
metadata = column_value.get('metadata', None)
description = column_value.get('description', None)
metadata = {'description': description}
if str2bool(is_partition):
partition_columns.append(column_name)
schema_item = StructField(name="{}".format(column_name), dataType=data_types.get(data_type),
nullable=is_nullable,
metadata=metadata)
schema_items.append(schema_item)
delta_table_schema = StructType(schema_items)
if len(schema_items) == len(partition_columns):
raise ValidationException(" All columns cannot be partition columns")
return delta_table_schema, partition_columns