kumarpritam863 opened a new pull request, #13756: URL: https://github.com/apache/iceberg/pull/13756
While this [PR](https://github.com/apache/iceberg/pull/12372) handled the a bunch of issues in ICR mode: 1. Coordinator being closed and not elected due to restricted partition movement in ICR mode. 2. Data loss as we were not seeking to the last committed offset. To make the interface consistent we called the close method with context.assignment() and close method always checks if the 0th partition is being lost then only close the coordinator. But in task stopping scenario, just after closing a task the partition will move as after task closing the consumer is also closed, here is the reference kafka code: ` @Override protected void close() { // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout // passed in try { task.stop(); } catch (Throwable t) { log.warn("Could not stop task", t); } taskStopped = true; Utils.closeQuietly(consumer, "consumer"); Utils.closeQuietly(headerConverterPlugin, "header converter"); Utils.closeQuietly(keyConverterPlugin, "key converter"); Utils.closeQuietly(valueConverterPlugin, "value converter"); Utils.closeQuietly(pluginMetrics, "plugin metrics"); /* Setting partition count explicitly to 0 to handle the case, when the task fails, which would cause its consumer to leave the group. This would cause onPartitionsRevoked to be invoked in the rebalance listener, but not onPartitionsAssigned, so the metrics for the task (which are still available for failed tasks until they are explicitly revoked from the worker) would become inaccurate. */ sinkTaskMetricsGroup.recordPartitionCount(0); }` In some cases it might happen that the 0th partition moved as part of rebalance and hence we closed the catalog without closing the coordinator leading to following exception: `java.lang.IllegalStateException: Connection pool shut down at org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.e xecute(CloseableHttpClient.java:56) at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:102) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:79) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:57) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:40) at software.amazon. awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage .java:79) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:36) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage .execute(StreamManagingStage.java:53) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk. core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuc cess(BaseSyncClientHandler.java:182) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53) at software.amazon.awssdk.services.glue.DefaultGlueClient.getTable(DefaultGlueClient.java:10007) at org.apache.iceberg.aws.glue.GlueTableOperations.getGlueTable(GlueTableOperations.java:279) at org.apache.iceberg.aws.glue.GlueTableOperations.doRefresh(GlueTableOperations.java:128) at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:88) at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:71) at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49) at org.apache.iceberg.connect.channel.Coordinator.commitToTable(Coordinator.jav a:188) at org.apache.iceberg.connect.channel.Coordinator.lambda$doCommit$1(Coordinator.java:152) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) What should I use in this case and what result it woiuld produce` To handle this scenario and permanently closing the coordinator, in this PR I am introducing a "taskStopped" flag, if set to true the coordinator will be closed even if it 0th partition moved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
