kumarpritam863 opened a new pull request, #13756:
URL: https://github.com/apache/iceberg/pull/13756

   While this [PR](https://github.com/apache/iceberg/pull/12372) handled the a 
bunch of issues in ICR mode:
   
   1. Coordinator being closed and not elected due to restricted partition 
movement in ICR mode.
   2. Data loss as we were not seeking to the last committed offset.
   
   To make the interface consistent we called the close method with 
context.assignment() and close method always checks if the 0th partition is 
being lost then only close the coordinator. But in task stopping scenario, just 
after closing a task the partition will move as after task closing the consumer 
is also closed, here is the reference kafka code:
   `    @Override
       protected void close() {
           // FIXME Kafka needs to add a timeout parameter here for us to 
properly obey the timeout
           // passed in
           try {
               task.stop();
           } catch (Throwable t) {
               log.warn("Could not stop task", t);
           }
           taskStopped = true;
           Utils.closeQuietly(consumer, "consumer");
           Utils.closeQuietly(headerConverterPlugin, "header converter");
           Utils.closeQuietly(keyConverterPlugin, "key converter");
           Utils.closeQuietly(valueConverterPlugin, "value converter");
           Utils.closeQuietly(pluginMetrics, "plugin metrics");
           /*
               Setting partition count explicitly to 0 to handle the case,
               when the task fails, which would cause its consumer to leave the 
group.
               This would cause onPartitionsRevoked to be invoked in the 
rebalance listener, but not onPartitionsAssigned,
               so the metrics for the task (which are still available for 
failed tasks until they are explicitly revoked
               from the worker) would become inaccurate.
           */
           sinkTaskMetricsGroup.recordPartitionCount(0);
       }`
       
       In some cases it might happen that the 0th partition moved as part of 
rebalance and hence we closed the catalog without closing the coordinator 
leading to following exception:
       `java.lang.IllegalStateException: Connection pool shut down at 
org.apache.http.util.Asserts.check(Asserts.java:34) at 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
 at 
software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)
 at 
software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)
 at 
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) 
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) 
at 
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
 at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
 at org.apache.http.impl.client.CloseableHttpClient.e
 xecute(CloseableHttpClient.java:56) at 
software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
 at 
software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
 at 
software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
 at 
software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
 at 
software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
 at 
software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:102)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:79)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:57)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:40)
 at software.amazon.
 
awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage
 .java:79) at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:36)
 at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at software.amazon.awssdk.core.internal.http.StreamManagingStage
 .execute(StreamManagingStage.java:53) at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
 at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at software.amazon.awssdk.
 
core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
 at 
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210)
 at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
 at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
 at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
 at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuc
 cess(BaseSyncClientHandler.java:182) at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
 at 
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
 at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
 at 
software.amazon.awssdk.services.glue.DefaultGlueClient.getTable(DefaultGlueClient.java:10007)
 at 
org.apache.iceberg.aws.glue.GlueTableOperations.getGlueTable(GlueTableOperations.java:279)
 at 
org.apache.iceberg.aws.glue.GlueTableOperations.doRefresh(GlueTableOperations.java:128)
 at 
org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:88)
 at 
org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:71)
 at 
org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49) 
at org.apache.iceberg.connect.channel.Coordinator.commitToTable(Coordinator.jav
 a:188) at 
org.apache.iceberg.connect.channel.Coordinator.lambda$doCommit$1(Coordinator.java:152)
 at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at 
org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840)
   
   What should I use in this case and what result it woiuld produce` 
   
   To handle this scenario and permanently closing the coordinator, in this PR 
I am introducing a "taskStopped" flag, if set to true the coordinator will be 
closed even if it 0th partition moved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to