vkaff opened a new issue, #43387:
URL: https://github.com/apache/arrow/issues/43387

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Let's say we use the following code to create some test parquet files
   ```python
   
   import pandas as pd
   from itertools import repeat
   from collections import defaultdict
   
   import pyarrow as pa
   import pyarrow.parquet as pq
   import pyarrow.dataset as ds
   
   
   schema: pa.schema = pa.schema(
       [
           ("id", pa.int64()),
           ("tag", pa.string()),
       ]
   )
   
   fast_write_options: dict = dict(
       compression="lz4",
       version="2.6",
       write_statistics=True,
   )
   
   for i, suffix in enumerate({"", "_10", "_20", "_30", "_40", "_50", "_60", 
"_70", "_80", "_90", "_100"}):
       outputs = []
   
       for tag in {"first", "second", "third", "fourth", "fifth", "sixth", 
"seventh", "eighth", "nineth", "tenth"}:
           outputs.append(
               pd.DataFrame({"id": list(range((i + 1) * int(1e6))), "tag": 
list(repeat(tag, (i + 1) * int(1e6)))})
           )
   
       df = pd.concat(outputs, ignore_index=True)
   
       writer = pq.ParquetWriter(
           str(f"./test{suffix}.parquet"),
           schema=schema,
           filesystem=pa.fs.LocalFileSystem(),
           **fast_write_options,
       )
   
       table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
       writer.write_table(table)
   
       writer.close()
   ```
   
   Then, I partition them based on the `tag` field
   
   ```python
   dataset_files = []
   for f in {"", "_10", "_20", "_30", "_40", "_50", "_60", "_70", "_80", "_90", 
"_100"}:
       dataset_files.append(f"./test{f}.parquet")
   
   input_datasets = ds.dataset(
       dataset_files,
       schema=schema,
       format="parquet",
       filesystem=pa.fs.LocalFileSystem(),
   )
   
   scanner = input_datasets.scanner(
       batch_readahead=4,
       fragment_readahead=1,
       use_threads=True,
   )
   ds.write_dataset(
       scanner,
       base_dir="base_part_dir",
       basename_template=f"part-{{i}}.parquet",
       format="parquet",
       filesystem=pa.fs.LocalFileSystem(),
       partitioning=ds.partitioning(pa.schema([("tag", pa.string())]), 
flavor="hive"),
       max_rows_per_group=1_024,
       min_rows_per_group=1_024,
       max_rows_per_file=20 * 10_480,
       existing_data_behavior="overwrite_or_ignore",
       
file_options=ds.ParquetFileFormat().make_write_options(**fast_write_options),
       use_threads=False,
   )
   ```
   
   And, try to read the row_groups of each fragment
   
   ```python
   import time
   
   import pyarrow as pa
   import pyarrow.dataset as ds
   
   schema: pa.schema = pa.schema(
       [
           ("id", pa.int64()),
           ("tag", pa.string()),
       ]
   )
   
   
   def main():
       intermediate_dataset = ds.dataset(
           "base_part_dir",
           schema=schema,
           format="parquet",
           partitioning=ds.partitioning(pa.schema([("tag", pa.string())]), 
flavor="hive"),
       )
   
       keys = []
       for f in intermediate_dataset.get_fragments():
           keys.append(ds.get_partition_keys(f.partition_expression)["tag"])
           f.row_groups
   
   
   if __name__ == "__main__":
       time.sleep(10)
   
       print("started")
       while True:
           main()
   
           print("Sleepig...")
           time.sleep(10)
   ```
   
   On the initial execution, it uses approximately 128MB of memory and, after 
having executed the `main()` once, it goes up to 900MB, and it stays there 
after successive iterations.
   
   Changing the values of  **max_rows_per_group / min_rows_per_group** to 
65_536, the memory usage does not increase at all, or very slightly.
   
   Therefore, my question is, why is this happening? Firstly, I would expect 
that reading metadata, i.e., `f.row_groups`, would cause minimal memory usage. 
And secondly, I would expect that, after the execution of `main()` is finished, 
the memory should get released. 
   
   Is this a bug, anticipated or I use the library in the wrong way? Is there 
something that I can do to mitigate this?
   
   ### Component(s)
   
   Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to