vkaff opened a new issue, #43387:
URL: https://github.com/apache/arrow/issues/43387
### Describe the bug, including details regarding any error messages,
version, and platform.
Let's say we use the following code to create some test parquet files
```python
import pandas as pd
from itertools import repeat
from collections import defaultdict
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
schema: pa.schema = pa.schema(
[
("id", pa.int64()),
("tag", pa.string()),
]
)
fast_write_options: dict = dict(
compression="lz4",
version="2.6",
write_statistics=True,
)
for i, suffix in enumerate({"", "_10", "_20", "_30", "_40", "_50", "_60",
"_70", "_80", "_90", "_100"}):
outputs = []
for tag in {"first", "second", "third", "fourth", "fifth", "sixth",
"seventh", "eighth", "nineth", "tenth"}:
outputs.append(
pd.DataFrame({"id": list(range((i + 1) * int(1e6))), "tag":
list(repeat(tag, (i + 1) * int(1e6)))})
)
df = pd.concat(outputs, ignore_index=True)
writer = pq.ParquetWriter(
str(f"./test{suffix}.parquet"),
schema=schema,
filesystem=pa.fs.LocalFileSystem(),
**fast_write_options,
)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
writer.write_table(table)
writer.close()
```
Then, I partition them based on the `tag` field
```python
dataset_files = []
for f in {"", "_10", "_20", "_30", "_40", "_50", "_60", "_70", "_80", "_90",
"_100"}:
dataset_files.append(f"./test{f}.parquet")
input_datasets = ds.dataset(
dataset_files,
schema=schema,
format="parquet",
filesystem=pa.fs.LocalFileSystem(),
)
scanner = input_datasets.scanner(
batch_readahead=4,
fragment_readahead=1,
use_threads=True,
)
ds.write_dataset(
scanner,
base_dir="base_part_dir",
basename_template=f"part-{{i}}.parquet",
format="parquet",
filesystem=pa.fs.LocalFileSystem(),
partitioning=ds.partitioning(pa.schema([("tag", pa.string())]),
flavor="hive"),
max_rows_per_group=1_024,
min_rows_per_group=1_024,
max_rows_per_file=20 * 10_480,
existing_data_behavior="overwrite_or_ignore",
file_options=ds.ParquetFileFormat().make_write_options(**fast_write_options),
use_threads=False,
)
```
And, try to read the row_groups of each fragment
```python
import time
import pyarrow as pa
import pyarrow.dataset as ds
schema: pa.schema = pa.schema(
[
("id", pa.int64()),
("tag", pa.string()),
]
)
def main():
intermediate_dataset = ds.dataset(
"base_part_dir",
schema=schema,
format="parquet",
partitioning=ds.partitioning(pa.schema([("tag", pa.string())]),
flavor="hive"),
)
keys = []
for f in intermediate_dataset.get_fragments():
keys.append(ds.get_partition_keys(f.partition_expression)["tag"])
f.row_groups
if __name__ == "__main__":
time.sleep(10)
print("started")
while True:
main()
print("Sleepig...")
time.sleep(10)
```
On the initial execution, it uses approximately 128MB of memory and, after
having executed the `main()` once, it goes up to 900MB, and it stays there
after successive iterations.
Changing the values of **max_rows_per_group / min_rows_per_group** to
65_536, the memory usage does not increase at all, or very slightly.
Therefore, my question is, why is this happening? Firstly, I would expect
that reading metadata, i.e., `f.row_groups`, would cause minimal memory usage.
And secondly, I would expect that, after the execution of `main()` is finished,
the memory should get released.
Is this a bug, anticipated or I use the library in the wrong way? Is there
something that I can do to mitigate this?
### Component(s)
Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]