SHuixo opened a new issue, #6307:
URL: https://github.com/apache/iceberg/issues/6307

   ### Query engine
   
   Flink 1.13.5
   
   ### Question
   
   First, the data in MySQL is synchronized to the iceberg table by using flink 
cdc, and the table data written is as follows:
   
   <img width="1069" alt="iceberg_data_file_20221117" 
src="https://user-images.githubusercontent.com/20868410/204496836-b56825fd-c888-416f-b6fd-a6a40cbce8d7.PNG";>
   
   After pausing the writer, I started reading iceberg's v2 table and wrote to 
another table in Hive, tried both streaming and batch data read and write 
modes, and encountered the following **OOM exception** in the **source** stage.
   
   <img width="866" alt="iceberg_20221129_02" 
src="https://user-images.githubusercontent.com/20868410/204501344-104b09d8-2d20-42e9-aff3-134cccdfd219.PNG";>
   
   
   ```
   2022-11-29 12:01:16,846 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Custom Source -> Calc(select=[...]) -> Sort(orderBy=[EXPR$27 ASC]) (1/29) 
(dd729717b17ca8ac020e1601bd785cd7) switched from RUNNING to FAILED on 
container_e37_1661772503533_1146093_01_000003 @ hadoop90 (dataPort=13756).
   java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.nio.HeapByteBuffer.duplicate(HeapByteBuffer.java:107) 
~[?:1.8.0_74]
        at 
org.apache.iceberg.shaded.org.apache.parquet.bytes.SingleBufferInputStream.slice(SingleBufferInputStream.java:120)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.shaded.org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:40)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.PageIterator.nextBinary(PageIterator.java:173) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ColumnIterator.nextBinary(ColumnIterator.java:143) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:242)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:235)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StructReader.read(ParquetValueReaders.java:699)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:116)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:367)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:337)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:85) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:167) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:187) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:132) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:75)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:84)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:76)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:58) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:106)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
   2022-11-29 12:01:16,885 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job f86172c96f044200243654a000edd270: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=7}]
   2022-11-29 12:01:16,917 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_0.
   2022-11-29 12:01:16,952 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 1 tasks should be restarted to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_0. 
   2022-11-29 12:01:17,000 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
insert-into_myHive.dhome_db.ods_t_di_gateway_deviceinfo_d_tmp2 
(f86172c96f044200243654a000edd270) switched from state RUNNING to FAILING.
   org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[?:?]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_74]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
   Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.nio.HeapByteBuffer.duplicate(HeapByteBuffer.java:107) 
~[?:1.8.0_74]
        at 
org.apache.iceberg.shaded.org.apache.parquet.bytes.SingleBufferInputStream.slice(SingleBufferInputStream.java:120)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.shaded.org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:40)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.PageIterator.nextBinary(PageIterator.java:173) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ColumnIterator.nextBinary(ColumnIterator.java:143) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:242)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StringReader.read(ParquetValueReaders.java:235)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetValueReaders$StructReader.read(ParquetValueReaders.java:699)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:116)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:367)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:337)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:85) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:167) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:187) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:132) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:75)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:84)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:76)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:58) 
~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:106)
 ~[DevIcebergDemo-0.14.1-sink.jar:?]
        at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
   ```
   
   > The program code is as follows:
   
   ```
   
   <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-flink-runtime-1.13</artifactId>
        <version>0.14.1</version>
   </dependency>
   
   
   public class FlinkWriteIceberg2Hive {
       public static void main(String[] args) {
   
           EnvironmentSettings env = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
           TableEnvironment tEnv = TableEnvironment.create(env);
   
           HiveCatalog hiveCatalog = new HiveCatalog("myHive", "dhome_db", 
"/etc/hive/conf/");
           tEnv.registerCatalog("myHive",hiveCatalog);
   
           // 1、创建 Catalog
           tEnv.executeSql("CREATE CATALOG hive_iceberg WITH (" +
                   "  'type'='iceberg'," +
                   "  'catalog-type'='hive'," +
                   "  'uri'='thrift://xxx.xxx.xxx:9083'," +
                   "  'clients'='5'," +
                   "  'property-version'='1'," +
                   "  'warehouse'='hdfs://nameservice1/user/hive/warehouse'" +
                   ")");
   
           // 2、将数据写出到 hive。
           tEnv.executeSql("insert into 
myHive.dhome_db.ods_t_di_gateway_deviceinfo_d_tmp2 " +
                   "select " +
                   " ... " +
                   "from hive_iceberg.dhome_db.ods_d_base_inf_229_iceberg");
   
       }
   }
   
   --- Task release command:
   ./bin/flink run-application -t yarn-application \
   -p 100 \
   -Djobmanager.memory.process.size=4096m \
   -Dtaskmanager.memory.process.size=20480m \
   -Dtaskmanager.memory.managed.fraction=0.15 \
   -Dtaskmanager.numberOfTaskSlots=1 \
   -Dyarn.application.name="FlinkSinkIceberg2Hive" \
   -Dheartbeat.timeout=3600000 \
   -Dakka.ask.timeout=60min \
   -c org.example.FlinkWriteIceberg2Hive ~/DevIcebergDemo-0.14.1-sink.jar 
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to