mxm commented on PR #12424:
URL: https://github.com/apache/iceberg/pull/12424#issuecomment-2784587046

   Quick recap on a discussion Steven, Peter, and I had in a separate channel, 
which is only partially captured in the PR comments:
   
   ### Iceberg Schema
   
   Dynamic Sink users currently need to provide an Iceberg Schema, which also 
requires them to define field ids. However, correctly defining field ids is 
cumbersome and error prone. That's why the sink is designed to ignore field ids 
and instead compare fields by their names. This works also for nested fields. 
We enforce field name uniqueness on every level of the schema, just like 
Iceberg. 
   
   Iceberg internally uses ids because they allow to uniquely identify a field, 
e.g. if a field has been removed and a new field gets added with the same name, 
the new field would not contain data of the old field. However, in the Dynamic 
Sink we don't allow field deletions, as this operation would not be safe with 
the schema migration logic of the sink. We wouldn't want to remove a field, 
only to re-add it later on, e.g. in case of out of order data. This would only 
work reliably if the user supplied accurate field ids which we found to be 
unrealistic.
   
   Consequently, we never use the user-supplied schema directly. Instead, we 
compare the user-supplied schema with the existing table schemas. If they are 
identical, we write with the matching table schema. Otherwise, we migrate the 
current table schema by deriving a set of migration steps, e.g. add field, 
widen type, make field optional, change position of field. In the process, we 
also rewrite the user-provided PartitionSpec (if any), which refers to a field 
id in the user-provided schema, to refer to the field id with the same name in 
the table schema.
   
   ### RowType
   
   Steven wondered whether Iceberg's `Schema` is the best schema input format 
for the user. He suggested Flink's `RowType` instead. Using RowType would not 
require generating field ids. On the other hand, the advantage of using Iceberg 
Schema directly is that there are no ambiguities when it comes to types, since 
tables can be created and their schema modified directly by the Dynamic Sink. 
Users would also be able to embed comments for fields. Additionally, 
PartitionSpec currently requires a schema to be bound to. 
   
   ### Using keyBy (hash distribution) for distributing records
   
   The Dynamic Sink currently uses keyBy for distribution modes NONE and HASH. 
The idea is that the Dynamic Sink works with a large number of tables which all 
have an individually configured write parallelism. In the case of NONE, we 
don't just hash distribute the data across the subtasks, but instead probe a 
keyGroup each for N subtasks, where N is the write parallelism. We then 
round-robin distribute the data across these tasks. The whole process is offset 
by a salt which is derived from the table name. This ensures that the 
distribution mode and write parallelism can dynamically be changed while 
retaining a balanced amount of work across all subtasks. For Hash, the process 
works similarly, only that we do a hash-based assignment of the 
deterministically random chosen N subtasks.
   
   ### Catalog Access
   
   The Dynamic Sink needs to access the Iceberg catalog to update the table 
metadata. The Dynamic sink supports two update modes when an unknown table 
schema is detected:
   
   1. Immediate update at every subtasks which discovers an unknown table schema
   2. Rerouting records to a single subtask which performs the schema update 
(default)
   
   Schemas and other table data is cached in TableDataCache which allows 
refreshing the table on a regular interval. The default is to allow this once 
per second. We agreed that the load of option (1) on the catalog will be quite 
high if the number of table updates and the write parallelism of those tables 
are high. Option (2) will scale quite well even with many updates and high 
write parallelism.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to