jqin61 commented on issue #208: URL: https://github.com/apache/iceberg-python/issues/208#issuecomment-1901345425
Based on the existing discussion, there are 3 major possible directions for detecting partitions and writing each partition in a multi-threaded way to maximize I/O. It seems there isn’t any approach simple enough that we could purely leverage the existing Pyarrow APIs in Pyiceberg. I list and compare these approaches for discussion purpose: **Filter Out Partitions** As Fokko suggested, we could filter the table to get partitions before writing but we will need an API on Arrow to get unique partition values (e.g. extend compute.unique() from array/scalar to table). ``` partitions: list[dict] = pyarrow.compute.unique(arrow_table) ``` With it, we could filter the table to get partitions and provide them as inputs to concurrent jobs. ``` arrow_table = pa.table({ 'column1': ['A', 'B', 'A', 'C', 'B'], 'column2': [1, 2, 1, 3, 2], 'column3': ['X', 'Y', 'X', 'Z', 'Y'] }) partition_keys = table.select(['column1', 'column2']) # The existing unique does not have support on table, we need to create API on Arrow side. partitions: list[dict] = pyarrow.compute.unique(partition_keys) # = [ {'column1': 'A', 'column2': 1}, {'column1': 'B', 'column2': 2}, {'column1': 'C', 'column3': 3} ] def filter_and_write_table(partition, index): # Create a boolean mask for rows that match the criteria mask = pc.and_(*(pc.equal(table[col], val) for col, val in partition.items())) # Filter the table filtered_table = table.filter(mask) # Write the filtered table to a Parquet file parquet_file = f'output_partition_{index}.parquet' pq.write_table(filtered_table, parquet_file) with ThreadPoolExecutor() as executor: for i, partition in enumerate(partitions): executor.submit(filter_and_write_table, partition, i) ``` **Sort and Single-direction Writing** As Fokko suggested, we could sort the table first. We then slice the table and do a one-direction scan for each slice to write out partitioned files. Suppose we have such an arrow API that takes a sorted table, scans through it, creates a new file whenever encountering a row with a new partition value, and raises an error if it encounters a row with a partition value it already passes, just like how spark writes to an iceberg table. ``` def write_table_partitions(sorted_table, partition_columns, dir) ``` Then we could do ``` partition_columns = ['column1', 'column2'] sorted_table = table.sort_by([('column1', 'ascending'), ('column2', 'ascending')]) directory_path = '/path/to/output/directory' # Break down the sorted table to slices with zero-copy slices = slice_table(sorted_table, slice_options) # Call the API with ThreadPoolExecutor() as executor: # Submit tasks to the executor for i, partition in enumerate(partitions): executor.submit(write_table_partitions, sorted_table, partition_columns, dir) ``` **Bucketing** We could create an arrow API to return the partitioned tables/record batches based on the inputs of a table and alist of partition columns in a way that the algorithm does a full scan of the arrow table in O(table_length) time and bucket-sorts it and creates a table/record batch for each bucket: ``` table_partitions = pyarrow.compute.partition(arrow_table, partition_columns) ``` We could write each batch: ``` def write_table_to_parquet(table, directory_path, file_suffix): # Construct the full path for the Parquet file file_path = os.path.join(directory_path, f'record_batch_{file_suffix}.parquet') # Write the table to a Parquet file pq.write_table(table, file_path) with ThreadPoolExecutor() as executor: for i, partition_as_table in enumerate(table_partitions): executor.submit(write_table_to_parquet, partition_as_table, directory_path, i) ``` As Fokko pointed out, the filter method will not be efficient if there are many partitions - the filter takes O(table_length) time and although each thread can filter on its own, on a single node, the execution will be O(table_length * number_of_partitions) for all the jobs. Technically we only need one same scan to get all the buckets. It seems the sort method is not as efficient compared to the bucketing method because the relative order of partitions does not matter, so a general sort algorithm on the partition column might be overkill (compared with bucketing). I feel like all 3 directions require some implementation on Arrow itself (I did not find any approach simple enough that we could purely leverage the existing Pyarrow APIs to implement any of the direction). And **I want to get opinions on whether pursuing arrow API level utilities smell good**. Thank you! Specifically, for the third direction of bucketing and returning materialized tables/batches, since Arrow has dataset.write_dataset() which supports partition-respected writing, I did some reading to see how it partitions and whether we could leverage anything from it. https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118 is where the partition happens. The partition algorithm is a full scan with bucket sort leveraging Grouper class utilities in arrow's compute component. Specifically: 1.Grouper.consume() initiates the groups based on the presenting partition columns https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L422 2.Grouper.MakeGroupings() builds a ListArray where each list represents a partition and each element in the list represents the row_id of the original table. https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L886C45-L886C58 3.Grouper.ApplyGroupings() efficiently converts the grouped representation of row_ids into actual rows. https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L875 Other than being used in the dataset writing, Grouper from Arrow's compute component is used to support other exposed compute APIs such as aggregation functions. At the end of the day, what we want (in order to support Pyiceberg's partitioned write) is an API that returns record batches/tables based on an input table and an input partition scheme, so maybe we could expose such a new API under compute leveraging Grouper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org