SHuixo opened a new issue, #6307: URL: https://github.com/apache/iceberg/issues/6307
### Query engine Flink 1.13.5 ### Question First, the data in MySQL is synchronized to the iceberg table by using flink cdc, and the table data written is as follows: <img width="1069" alt="iceberg_data_file_20221117" src="https://user-images.githubusercontent.com/20868410/204496836-b56825fd-c888-416f-b6fd-a6a40cbce8d7.PNG"> After pausing the writer, I started reading iceberg's v2 table and wrote to another table in Hive, tried both streaming and batch data read and write modes, and encountered the following **OOM exception** in the **source** stage. <img width="866" alt="iceberg_20221129_02" src="https://user-images.githubusercontent.com/20868410/204501344-104b09d8-2d20-42e9-aff3-134cccdfd219.PNG"> ``` 2022-11-29 12:01:16,846 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Calc(select=[...]) -> Sort(orderBy=[EXPR$27 ASC]) (1/29) (dd729717b17ca8ac020e1601bd785cd7) switched from RUNNING to FAILED on container_e37_1661772503533_1146093_01_000003 @ hadoop90 (dataPort=13756). java.lang.OutOfMemoryError: GC overhead limit exceeded at java.nio.HeapByteBuffer.duplicate(HeapByteBuffer.java:107) ~[?:1.8.0_74] at org.apache.iceberg.shaded.org.apache.parquet.bytes.SingleBufferInputStream.slice(SingleBufferInputStream.java:120) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.shaded.org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:40) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.PageIterator.nextBinary(PageIterator.java:173) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ColumnIterator.nextBinary(ColumnIterator.java:143) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:242) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:235) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.read(ParquetValueReaders.java:699) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:116) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:367) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:337) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:85) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:167) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:187) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:132) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:75) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:84) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:76) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:58) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:106) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.11-1.13.5.jar:1.13.5] 2022-11-29 12:01:16,885 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job f86172c96f044200243654a000edd270: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=7}] 2022-11-29 12:01:16,917 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2022-11-29 12:01:16,952 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2022-11-29 12:01:17,000 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_myHive.dhome_db.ods_t_di_gateway_deviceinfo_d_tmp2 (f86172c96f044200243654a000edd270) switched from state RUNNING to FAILING. org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_74] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.5.jar:1.13.5] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.5.jar:1.13.5] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.5.jar:1.13.5] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.5.jar:1.13.5] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.5.jar:1.13.5] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.5.jar:1.13.5] Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.nio.HeapByteBuffer.duplicate(HeapByteBuffer.java:107) ~[?:1.8.0_74] at org.apache.iceberg.shaded.org.apache.parquet.bytes.SingleBufferInputStream.slice(SingleBufferInputStream.java:120) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.shaded.org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:40) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.PageIterator.nextBinary(PageIterator.java:173) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ColumnIterator.nextBinary(ColumnIterator.java:143) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:242) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:235) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.read(ParquetValueReaders.java:699) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:116) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:367) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:337) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:85) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:167) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:187) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:132) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:75) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:84) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:76) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:58) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:106) ~[DevIcebergDemo-0.14.1-sink.jar:?] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.11-1.13.5.jar:1.13.5] ``` > The program code is as follows: ``` <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime-1.13</artifactId> <version>0.14.1</version> </dependency> public class FlinkWriteIceberg2Hive { public static void main(String[] args) { EnvironmentSettings env = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(env); HiveCatalog hiveCatalog = new HiveCatalog("myHive", "dhome_db", "/etc/hive/conf/"); tEnv.registerCatalog("myHive",hiveCatalog); // 1、创建 Catalog tEnv.executeSql("CREATE CATALOG hive_iceberg WITH (" + " 'type'='iceberg'," + " 'catalog-type'='hive'," + " 'uri'='thrift://xxx.xxx.xxx:9083'," + " 'clients'='5'," + " 'property-version'='1'," + " 'warehouse'='hdfs://nameservice1/user/hive/warehouse'" + ")"); // 2、将数据写出到 hive。 tEnv.executeSql("insert into myHive.dhome_db.ods_t_di_gateway_deviceinfo_d_tmp2 " + "select " + " ... " + "from hive_iceberg.dhome_db.ods_d_base_inf_229_iceberg"); } } --- Task release command: ./bin/flink run-application -t yarn-application \ -p 100 \ -Djobmanager.memory.process.size=4096m \ -Dtaskmanager.memory.process.size=20480m \ -Dtaskmanager.memory.managed.fraction=0.15 \ -Dtaskmanager.numberOfTaskSlots=1 \ -Dyarn.application.name="FlinkSinkIceberg2Hive" \ -Dheartbeat.timeout=3600000 \ -Dakka.ask.timeout=60min \ -c org.example.FlinkWriteIceberg2Hive ~/DevIcebergDemo-0.14.1-sink.jar ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org