thijsheijden opened a new issue, #13313:
URL: https://github.com/apache/iceberg/issues/13313

   ### Query engine
   
   Spark
   
   ### Question
   
   I am adding around 1 million files to an Iceberg table using the `add_files` 
procedure. After adding these files a script updates the manifest files adding 
bloom filters to provide data skipping using set membership queries. My problem 
arrises from Spark compacting my hundreds of manifests into a couple massive 
manifests.
   
   I am inserting files in batches of around 1000 files, which leads to around 
1000 manifest files, each of which is around 2.6MB. When compacted these 1000 
manifest files turn into 10 manifest files of around 2.6GB. These files are too 
large and cause IO errors in DuckDB Iceberg. How can I disable this compaction, 
or limit the amount of compaction? I have tried disabling 
`commit.manifest-merge.enabled` and lowering 
`commit.manifest.target-size-bytes`, but these changes seem to have no effect.
   
   Here is the Spark code I am using to add my files:
   
   ```
   conf = pyspark.SparkConf()
   conf.setMaster('local[*]')
   conf.set('spark.sql.catalog.iceberg_catalog', 
'org.apache.iceberg.spark.SparkCatalog')
   conf.set('spark.sql.catalog.iceberg_catalog.type', 'hadoop')
   conf.set('spark.sql.catalog.iceberg_catalog.warehouse', 
'test_data/iceberg/data')
   conf.set('spark.sql.parquet.outputTimestampType', 'TIMESTAMP_MICROS')
   conf.set('spark.driver.memory', '10g')
   conf.set('spark.jars.packages', 
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1')
   conf.set('spark.sql.extensions', 
'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
   conf.set('spark.sql.iceberg.commit.manifest-merge.enabled', 'false')
   conf.set('spark.sql.iceberg.commit.manifest.target-size-bytes', '4194304') # 
Halve the manifest target size to prevent manifests from becoming too large 
when using bloom filters
   
   spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()
   sc = spark.sparkContext
   sc.setLogLevel("ERROR")
   spark.sql("USE iceberg_catalog")
   spark.sql("CREATE NAMESPACE IF NOT EXISTS default")
   spark.sql("USE NAMESPACE default")
   
   # Load the batches of files to import
   batches = os.listdir(args.file_dir)
   first_file = os.path.join(args.file_dir, "batch_0", 
os.listdir(os.path.join(args.file_dir, "batch_0"))[0])
   
   # Create table using schema of the first file
   df = spark.read.parquet(first_file)
   empty_df = spark.createDataFrame([], df.schema)
   empty_df.writeTo(args.table).create()
   
   batch_idx = 1
   for batch_dir in batches:
       print(f"Adding batch {batch_idx}")
       spark.sql(f"""
           CALL iceberg_catalog.system.add_files(
             table => 'default.{args.table}',
             source_table => '`parquet`.`{os.path.join(args.file_dir, 
batch_dir)}`',
             parallelism => 10
           )
       """)
       batch_idx += 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to