邓凯
05/09/2023, 3:31 AMDavid Schenk
05/09/2023, 6:54 PM// check if schema needs to be updated
let table_metadata = table.get_metadata().unwrap();
let equal = arrow_schema.eq(&ArrowSchema::try_from(&table_metadata.schema.clone()).unwrap());
if !equal {
println!("Schema seems to be unequal and will be merged and updated.");
let merged_schema = ArrowSchema::try_merge(vec![ArrowSchema::try_from(&table_metadata.schema).unwrap(), arrow_schema.clone()]).unwrap();
let table_metadata = &DeltaTableMetaData::new(
table_metadata.name.clone(),
table_metadata.description.clone(),
Some(table_metadata.format.clone()),
Schema::from(SchemaTypeStruct::try_from(&merged_schema).unwrap()),
table_metadata.partition_columns.clone(),
table_metadata.configuration.clone()
);
actions.push(action::Action::metaData(action::MetaData::try_from(table_metadata.clone()).unwrap()));
batch_writer.update_schema(table_metadata).expect("TODO: panic message");
}
batch_writer.write(record_batch).await.expect("Failed write record batch operation");
let mut add_actions = batch_writer.flush().await.unwrap().iter().map(
|add| action::Action::add(add.clone().into())
).collect::<Vec<action::Action>>();
actions.append(&mut add_actions);
// commit write operation
operations::transaction::commit(
table.object_store().storage_backend().clone().deref(),
&actions,
action::DeltaOperation::Write {
mode: action::SaveMode::Append,
partition_by: Some(partition_columns.clone()),
predicate: None,
},
&table.get_state(),
None
).await.unwrap();
邓凯
05/10/2023, 2:12 AM