jqin61 commented on issue #208:
URL: https://github.com/apache/iceberg-python/issues/208#issuecomment-1901345425

   Based on the existing discussion, there are 3 major possible directions for 
detecting partitions and writing each partition in a multi-threaded way to 
maximize I/O. It seems there isn’t any approach simple enough that we could 
purely leverage the existing Pyarrow APIs in Pyiceberg. I list and compare 
these approaches for discussion purpose:
   
   **Filter Out Partitions**
   As Fokko suggested, we could filter the table to get partitions before 
writing but we will need an API on Arrow to get unique partition values (e.g. 
extend compute.unique() from array/scalar to table).
   ```
   partitions: list[dict] = pyarrow.compute.unique(arrow_table)
   ```
   
   With it, we could filter the table to get partitions and provide them as 
inputs to concurrent jobs.
   ```
   arrow_table = pa.table({
       'column1': ['A', 'B', 'A', 'C', 'B'],
       'column2': [1, 2, 1, 3, 2],
       'column3': ['X', 'Y', 'X', 'Z', 'Y']
   })
   partition_keys = table.select(['column1', 'column2'])
   # The existing unique does not have support on table, we need to create API 
on Arrow side.
   partitions: list[dict] = pyarrow.compute.unique(partition_keys)
   # = [
       {'column1': 'A', 'column2': 1},
       {'column1': 'B', 'column2': 2},
       {'column1': 'C', 'column3': 3}
   ]
   
   
   def filter_and_write_table(partition, index):
       # Create a boolean mask for rows that match the criteria
       mask = pc.and_(*(pc.equal(table[col], val) for col, val in 
partition.items()))
       
       # Filter the table
       filtered_table = table.filter(mask)
   
       # Write the filtered table to a Parquet file
       parquet_file = f'output_partition_{index}.parquet'
       pq.write_table(filtered_table, parquet_file)
   
   with ThreadPoolExecutor() as executor:
       for i, partition in enumerate(partitions):
           executor.submit(filter_and_write_table, partition, i)
   ```
   
   **Sort and Single-direction Writing**
   As Fokko suggested, we could sort the table first. We then slice the table 
and do a one-direction scan for each slice to write out partitioned files. 
   Suppose we have such an arrow API that takes a sorted table, scans through 
it, creates a new file whenever encountering a row with a new partition value, 
and raises an error if it encounters a row with a partition value it already 
passes, just like how spark writes to an iceberg table.
   ```
   def write_table_partitions(sorted_table, partition_columns, dir)
   ```
   
   Then we could do
   ```
   partition_columns = ['column1', 'column2']
   sorted_table = table.sort_by([('column1', 'ascending'), ('column2', 
'ascending')])
   directory_path = '/path/to/output/directory'
   
   # Break down the sorted table to slices with zero-copy
   slices = slice_table(sorted_table, slice_options)
   
   # Call the API
   with ThreadPoolExecutor() as executor:
       # Submit tasks to the executor
       for i, partition in enumerate(partitions):
           executor.submit(write_table_partitions, sorted_table, 
partition_columns, dir)
   ```
   
   **Bucketing**
   We could create an arrow API to return the partitioned tables/record batches 
based on the inputs of a table and alist of partition columns in a way that the 
algorithm does a full scan of the arrow table in O(table_length) time and 
bucket-sorts it and creates a table/record batch for each bucket:
   ```
   table_partitions = pyarrow.compute.partition(arrow_table, partition_columns)
   ```
   
   We could write each batch:
   ```
   def write_table_to_parquet(table, directory_path, file_suffix):
       # Construct the full path for the Parquet file
       file_path = os.path.join(directory_path, 
f'record_batch_{file_suffix}.parquet')
   
       # Write the table to a Parquet file
       pq.write_table(table, file_path)
   
   with ThreadPoolExecutor() as executor:
       for i, partition_as_table in enumerate(table_partitions):
           executor.submit(write_table_to_parquet, partition_as_table, 
directory_path, i)
   ```
   
   As Fokko pointed out, the filter method will not be efficient if there are 
many partitions - the filter takes O(table_length) time and although each 
thread can filter on its own, on a single node, the execution will be 
O(table_length * number_of_partitions) for all the jobs. Technically we only 
need one same scan to get all the buckets.
   
   It seems the sort method is not as efficient compared to the bucketing 
method because the relative order of partitions does not matter, so a general 
sort algorithm on the partition column might be overkill (compared with 
bucketing).
   
   I feel like all 3 directions require some implementation on Arrow itself (I 
did not find any approach simple enough that we could purely leverage the 
existing Pyarrow APIs to implement any of the direction). And **I want to get 
opinions on whether pursuing arrow API level utilities smell good**. Thank you!
   
   Specifically, for the third direction of bucketing and returning 
materialized tables/batches, since Arrow has dataset.write_dataset() which 
supports partition-respected writing, I did some reading to see how it 
partitions and whether we could leverage anything from it. 
   
   
https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118
 is where the partition happens. The partition algorithm is a full scan with 
bucket sort leveraging  Grouper class utilities in arrow's compute component. 
Specifically:
   1.Grouper.consume() initiates the groups based on the presenting partition 
columns
   
https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L422
   2.Grouper.MakeGroupings() builds a ListArray where each list represents a 
partition and each element in the list represents the row_id of the original 
table.
   
https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L886C45-L886C58
   3.Grouper.ApplyGroupings() efficiently converts the grouped representation 
of row_ids into actual rows.
   
https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L875
   
   Other than being used in the dataset writing, Grouper from Arrow's compute 
component is used to support other exposed compute APIs such as aggregation 
functions. At the end of the day, what we want (in order to support Pyiceberg's 
partitioned write) is an API that returns record batches/tables based on an 
input table and an input partition scheme, so maybe we could expose such a new 
API under compute leveraging Grouper.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to