https://delta.io logo
u

邓凯

05/09/2023, 3:31 AM
Does delta-rs support mergeSchema now?
d

David Schenk

05/09/2023, 6:54 PM
I was able to archive something similiar by adding an updated DeltaTableMetadata in a transaction with a write operation.
Copy code
// check if schema needs to be updated
let table_metadata = table.get_metadata().unwrap();
let equal = arrow_schema.eq(&ArrowSchema::try_from(&table_metadata.schema.clone()).unwrap());
if !equal {
    println!("Schema seems to be unequal and will be merged and updated.");
    let merged_schema = ArrowSchema::try_merge(vec![ArrowSchema::try_from(&table_metadata.schema).unwrap(), arrow_schema.clone()]).unwrap();
    let table_metadata = &DeltaTableMetaData::new(
        table_metadata.name.clone(),
        table_metadata.description.clone(),
        Some(table_metadata.format.clone()),
        Schema::from(SchemaTypeStruct::try_from(&merged_schema).unwrap()),
        table_metadata.partition_columns.clone(),
        table_metadata.configuration.clone()
    );
    actions.push(action::Action::metaData(action::MetaData::try_from(table_metadata.clone()).unwrap()));
    batch_writer.update_schema(table_metadata).expect("TODO: panic message");
}

batch_writer.write(record_batch).await.expect("Failed write record batch operation");
let mut add_actions = batch_writer.flush().await.unwrap().iter().map(
    |add| action::Action::add(add.clone().into())
).collect::<Vec<action::Action>>();

actions.append(&mut add_actions);

// commit write operation
operations::transaction::commit(
    table.object_store().storage_backend().clone().deref(),
    &actions,
    action::DeltaOperation::Write {
        mode: action::SaveMode::Append,
        partition_by: Some(partition_columns.clone()),
        predicate: None,
    },
    &table.get_state(),
    None
).await.unwrap();
u

邓凯

05/10/2023, 2:12 AM
👍