mxm commented on PR #12424: URL: https://github.com/apache/iceberg/pull/12424#issuecomment-2784587046
Quick recap on a discussion Steven, Peter, and I had in a separate channel, which is only partially captured in the PR comments: ### Iceberg Schema Dynamic Sink users currently need to provide an Iceberg Schema, which also requires them to define field ids. However, correctly defining field ids is cumbersome and error prone. That's why the sink is designed to ignore field ids and instead compare fields by their names. This works also for nested fields. We enforce field name uniqueness on every level of the schema, just like Iceberg. Iceberg internally uses ids because they allow to uniquely identify a field, e.g. if a field has been removed and a new field gets added with the same name, the new field would not contain data of the old field. However, in the Dynamic Sink we don't allow field deletions, as this operation would not be safe with the schema migration logic of the sink. We wouldn't want to remove a field, only to re-add it later on, e.g. in case of out of order data. This would only work reliably if the user supplied accurate field ids which we found to be unrealistic. Consequently, we never use the user-supplied schema directly. Instead, we compare the user-supplied schema with the existing table schemas. If they are identical, we write with the matching table schema. Otherwise, we migrate the current table schema by deriving a set of migration steps, e.g. add field, widen type, make field optional, change position of field. In the process, we also rewrite the user-provided PartitionSpec (if any), which refers to a field id in the user-provided schema, to refer to the field id with the same name in the table schema. ### RowType Steven wondered whether Iceberg's `Schema` is the best schema input format for the user. He suggested Flink's `RowType` instead. Using RowType would not require generating field ids. On the other hand, the advantage of using Iceberg Schema directly is that there are no ambiguities when it comes to types, since tables can be created and their schema modified directly by the Dynamic Sink. Users would also be able to embed comments for fields. Additionally, PartitionSpec currently requires a schema to be bound to. ### Using keyBy (hash distribution) for distributing records The Dynamic Sink currently uses keyBy for distribution modes NONE and HASH. The idea is that the Dynamic Sink works with a large number of tables which all have an individually configured write parallelism. In the case of NONE, we don't just hash distribute the data across the subtasks, but instead probe a keyGroup each for N subtasks, where N is the write parallelism. We then round-robin distribute the data across these tasks. The whole process is offset by a salt which is derived from the table name. This ensures that the distribution mode and write parallelism can dynamically be changed while retaining a balanced amount of work across all subtasks. For Hash, the process works similarly, only that we do a hash-based assignment of the deterministically random chosen N subtasks. ### Catalog Access The Dynamic Sink needs to access the Iceberg catalog to update the table metadata. The Dynamic sink supports two update modes when an unknown table schema is detected: 1. Immediate update at every subtasks which discovers an unknown table schema 2. Rerouting records to a single subtask which performs the schema update (default) Schemas and other table data is cached in TableDataCache which allows refreshing the table on a regular interval. The default is to allow this once per second. We agreed that the load of option (1) on the catalog will be quite high if the number of table updates and the write parallelism of those tables are high. Option (2) will scale quite well even with many updates and high write parallelism. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org