jembishop opened a new issue, #1868:
URL: https://github.com/apache/iceberg-rust/issues/1868
### Apache Iceberg Rust version
None
### Describe the bug
It seems if I use fast append for a iceberg table using the glue catalog,
the optimistic concurrency doesn't seem to work, and some of my processes
registered files get clobbered by other concurrent processing.
My setup is a iceberg table with the glue catalog in AWS. I create parquet
files (not using rust-iceberg), then use rust iceberg to register the uploaded
files with iceberg, retrying to ensure optimistic concurrency errors are
retried.
```rust
async fn register_files_to_iceberg_batch(
bucket_name: &str, region: &str, event_name: &str, is_test: bool,
requests: Vec<IcebergRegistrationRequest>,
credentials: &aws_credential_types::Credentials,
) -> anyhow::Result<()> {
const RETRY_DURATION: std::time::Duration =
std::time::Duration::from_secs(10);
const MAX_ATTEMPTS: u32 = 100;
for attempt in 0..MAX_ATTEMPTS {
let catalog = create_glue_catalog(bucket_name, credentials)
.await
.with_context(|| format!("Failed to create Glue catalog on
attempt {}", attempt))?;
match register_files_to_iceberg_batch_once(bucket_name, region,
event_name, is_test, &requests, &catalog).await
{
Ok(()) => return Ok(()),
Err(err) => {
let is_commit_conflict = err
.downcast_ref::<iceberg::Error>()
.map(|e| e.kind() == ErrorKind::CatalogCommitConflicts)
.unwrap_or(false);
if attempt < MAX_ATTEMPTS - 1 {
if is_commit_conflict {
tracing::info!(
attempt,
error = &*err,
retry_duration = ?RETRY_DURATION,
"Iceberg commit conflict, retrying batch with
fresh catalog"
);
} else {
tracing::error!(
attempt,
error = &*err,
retry_duration = ?RETRY_DURATION,
"Error registering files to Iceberg, retrying
batch with fresh catalog"
);
}
tokio::time::sleep(RETRY_DURATION).await;
} else {
bail!("Max retry attempts reached for Iceberg batch
registration: {}", err);
}
}
}
}
Ok(())
}
async fn register_files_to_iceberg_batch_once(
bucket_name: &str, region: &str, event_name: &str, is_test: bool,
requests: &[IcebergRegistrationRequest],
catalog: &iceberg_catalog_glue::GlueCatalog,
) -> anyhow::Result<()> {
let start = std::time::Instant::now();
let region_name = region_to_name(region);
let table_name = format!("{}_{}_iceberg", region_name, event_name);
let database_name = if is_test { "test_market_data" } else {
"market_data" };
tracing::info!(
table_name,
file_count = requests.len(),
"Registering files to Iceberg table"
);
let table_ident =
TableIdent::new(NamespaceIdent::new(database_name.to_string()),
table_name.clone());
let table = catalog.load_table(&table_ident).await?;
let partition_spec = table.metadata().default_partition_spec();
// Build data files for all requests
let mut data_files = Vec::new();
for request in requests {
let full_path = format!(
"s3://{}/{}",
bucket_name,
request.s3_key.to_string_with_id(&request.file_id)
);
let mut partition_values = Vec::new();
for field in partition_spec.fields() {
let source_field = table
.metadata()
.current_schema()
.field_by_id(field.source_id)
.context("Failed to get source field")?;
let field_name = source_field.name.as_str();
let literal = match field_name {
"InstrumentKind" =>
Some(Literal::string(request.s3_key.partition_info.instrument_kind.clone())),
"Exchange" =>
Some(Literal::string(request.s3_key.partition_info.exchange.clone())),
"Date" => {
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let days_since_epoch = request
.s3_key
.partition_info
.date
.signed_duration_since(epoch)
.num_days() as i32;
Some(Literal::date(days_since_epoch))
}
_ => None,
};
partition_values.push(literal);
}
let partition = Struct::from_iter(partition_values);
let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(full_path.clone())
.file_format(DataFileFormat::Parquet)
.partition(partition)
.record_count(request.metadata.record_count)
.file_size_in_bytes(request.metadata.file_size)
.partition_spec_id(table.metadata().default_partition_spec_id())
.column_sizes(request.metadata.stats.column_sizes.clone())
.value_counts(request.metadata.stats.value_counts.clone())
.null_value_counts(request.metadata.stats.null_value_counts.clone())
.lower_bounds(request.metadata.stats.lower_bounds.clone())
.upper_bounds(request.metadata.stats.upper_bounds.clone())
.build()?;
data_files.push(data_file);
}
// Single commit with all data files
let tx = Transaction::new(&table);
let append_action =
tx.fast_append().with_check_duplicate(false).add_data_files(data_files);
let tx = append_action.apply(tx)?;
tx.commit(catalog).await?;
let committed_files: Vec<String> = requests
.iter()
.map(|r| r.s3_key.to_string_with_id(&r.file_id))
.collect();
tracing::info!(
table = table_name,
file_count = requests.len(),
elapsed_ms = start.elapsed().as_millis(),
files = ?committed_files,
"Successfully registered files to Iceberg in single commit"
);
Ok(())
}
```
When running this, I don't actually observe any logs related to iceberg
commit conflicts, and I get a bunch of orphaned unregistered files with
concurrent writers, which I thought was due to optimistic concurrency conflicts
not being raised.
I have a branch
[here](https://github.com/apache/iceberg-rust/compare/main...jembishop:iceberg-rust:version_fix)
which sets the glue table version id, which seems to solve the problem for me.
Is this a bug or am I misusing the library?
Thanks
### To Reproduce
Set up an iceberg table in s3 with aws glue, and use multiple writers
attempting to register parquet files using the fast append action at the same
time, with optimistic retrying for concurrency.
### Expected behavior
There will be large amounts of orphans due to concurrent modification errors
not occuring.
### Willingness to contribute
None
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]