haizhou-zhao opened a new issue, #8222:
URL: https://github.com/apache/iceberg/issues/8222

   ### Apache Iceberg version
   
   1.2.1
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   ### Setup
   Running Flink pipeline with Iceberg Source over Iceberg REST Catalog
   ```
   // Flink side setup
   public static void main(String[] args) throws Exception {
       ParameterTool parameters = ParameterTool.fromArgs(args);
       org.apache.flink.configuration.Configuration conf =
           new org.apache.flink.configuration.Configuration();
       final StreamExecutionEnvironment env =
           StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
       env.enableCheckpointing(60000);
       final boolean isLocalEnv =
           env instanceof LocalStreamEnvironment || 
parameters.has("isLocalCluster");
       if (isLocalEnv) {
         env.setParallelism(1);
       }
       env.getConfig().setGlobalJobParameters(parameters);
       // Iceberg REST Catalog client is initialized here
       CatalogLoader catalogLoader = initIcebergCatalogLoader();
   
       IcebergSource<RowData> icebergSource = IcebergSource.forRowData()
           .streaming(true)
           
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
           .monitorInterval(Duration.ofMillis(10L))
           .tableLoader(TableLoader.fromCatalog(catalogLoader, 
TableIdentifier.of(DB, TBL)))
           .assignerFactory(new SimpleSplitAssignerFactory())
           .build();
   
       env.fromSource(icebergSource, WatermarkStrategy.noWatermarks(), 
"icebergSource", TypeInformation.of(RowData.class))
           .print();
   
       env.execute();
     }
   ```
   
   ### Issue
   ```
   14:57:42.376 [SourceCoordinator-Source: icebergSrouce] ERROR 
org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator - Failed 
to discover new splits
   java.lang.IllegalStateException: Connection pool shut down
        at org.apache.hc.core5.util.Asserts.check(Asserts.java:38)
        at 
org.apache.hc.core5.pool.StrictConnPool.lease(StrictConnPool.java:176)
        at 
org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager.lease(PoolingHttpClientConnectionManager.java:298)
        at 
org.apache.hc.client5.http.impl.classic.InternalExecRuntime.acquireEndpoint(InternalExecRuntime.java:103)
        at 
org.apache.hc.client5.http.impl.classic.ConnectExec.execute(ConnectExec.java:125)
        at 
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
        at 
org.apache.hc.client5.http.impl.classic.ProtocolExec.execute(ProtocolExec.java:192)
        at 
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
        at 
org.apache.hc.client5.http.impl.classic.HttpRequestRetryExec.execute(HttpRequestRetryExec.java:96)
        at 
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
        at 
org.apache.hc.client5.http.impl.classic.ContentCompressionExec.execute(ContentCompressionExec.java:152)
        at 
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
        at 
org.apache.hc.client5.http.impl.classic.RedirectExec.execute(RedirectExec.java:115)
        at 
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
        at 
org.apache.hc.client5.http.impl.classic.InternalHttpClient.doExecute(InternalHttpClient.java:170)
        at 
org.apache.hc.client5.http.impl.classic.CloseableHttpClient.execute(CloseableHttpClient.java:123)
        at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:268)
        at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:220)
        at org.apache.iceberg.rest.HTTPClient.get(HTTPClient.java:321)
        at org.apache.iceberg.rest.RESTClient.get(RESTClient.java:96)
        at org.apache.iceberg.rest.RESTClient.get(RESTClient.java:79)
        at 
org.apache.iceberg.rest.RESTTableOperations.refresh(RESTTableOperations.java:99)
        at org.apache.iceberg.BaseTable.refresh(BaseTable.java:68)
        at 
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.planSplits(ContinuousSplitPlannerImpl.java:74)
        at 
org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator.discoverSplits(ContinuousIcebergEnumerator.java:118)
        at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:130)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   ### RCA
   - **Why did we saw `java.lang.IllegalStateException: Connection pool shut 
down`?**
   a. Because more HTTP requests were issued after the REST Catalog was closed 
(which causes the underlying HTTP client to close with it)
   - **Where was Catalog closed? And which component closed it?**
   a. 
        1. Source Coordinator closed the Catalog when it is attempting to 
create `ContinuousSplitPlanner`, specifically in the following lines:
           ```
           if (scanContext.isStreaming()) {
               ContinuousSplitPlanner splitPlanner =
                   new ContinuousSplitPlannerImpl(lazyTable(), scanContext, 
planningThreadName());
                   ...
           ```
   
           Ref: 
https://github.com/apache/iceberg/blob/1.2.x/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L190
   
       2. The `lazyTable()` call will close the TableLoader after table 
information is obtained, specifically in this try block:
           ```
           private Table lazyTable() {
             if (table == null) {
               tableLoader.open();
               try (TableLoader loader = tableLoader) {
                 this.table = loader.loadTable();
               } catch (IOException e) {
                 throw new UncheckedIOException("Failed to close table loader", 
e);
               }
             }
   
             return table;
           }
           ```
   
           Ref: 
https://github.com/apache/iceberg/blob/1.2.x/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L130
   
       3. The `TableLoader#close()` call will close the underlying Catalog, 
which in this instance is RESTCatalog. The `RESTCatalog#close()` call will 
close the underlying HTTP client. Even though the underlying REST Catalog/HTTP 
client is closed, the table reference returned by `lazyTable()` call still 
holds a reference to this closed HTTP client.
    
    - **After that, which component continues to use this closed REST 
Catalog/HTTP client instance?**
    a. After `ContinuousSplitPlanner` is constructed (with the reference to the 
table that holds a reference to the closed HTTP client), at a later point, 
`planSplits()` will be invoked to discover splits. Within 
`ContinuousSplitPlannerImpl#planSplits` call, it will use the table reference 
with closed underlying HTTP client to invoke `table.refresh()` (which is an 
HTTP GET call for loadTable API). Invoking HTTP calls with closed HTTP client 
will fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to