haizhou-zhao opened a new issue, #8222:
URL: https://github.com/apache/iceberg/issues/8222
### Apache Iceberg version
1.2.1
### Query engine
Flink
### Please describe the bug 🐞
### Setup
Running Flink pipeline with Iceberg Source over Iceberg REST Catalog
```
// Flink side setup
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
org.apache.flink.configuration.Configuration conf =
new org.apache.flink.configuration.Configuration();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.enableCheckpointing(60000);
final boolean isLocalEnv =
env instanceof LocalStreamEnvironment ||
parameters.has("isLocalCluster");
if (isLocalEnv) {
env.setParallelism(1);
}
env.getConfig().setGlobalJobParameters(parameters);
// Iceberg REST Catalog client is initialized here
CatalogLoader catalogLoader = initIcebergCatalogLoader();
IcebergSource<RowData> icebergSource = IcebergSource.forRowData()
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.monitorInterval(Duration.ofMillis(10L))
.tableLoader(TableLoader.fromCatalog(catalogLoader,
TableIdentifier.of(DB, TBL)))
.assignerFactory(new SimpleSplitAssignerFactory())
.build();
env.fromSource(icebergSource, WatermarkStrategy.noWatermarks(),
"icebergSource", TypeInformation.of(RowData.class))
.print();
env.execute();
}
```
### Issue
```
14:57:42.376 [SourceCoordinator-Source: icebergSrouce] ERROR
org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator - Failed
to discover new splits
java.lang.IllegalStateException: Connection pool shut down
at org.apache.hc.core5.util.Asserts.check(Asserts.java:38)
at
org.apache.hc.core5.pool.StrictConnPool.lease(StrictConnPool.java:176)
at
org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager.lease(PoolingHttpClientConnectionManager.java:298)
at
org.apache.hc.client5.http.impl.classic.InternalExecRuntime.acquireEndpoint(InternalExecRuntime.java:103)
at
org.apache.hc.client5.http.impl.classic.ConnectExec.execute(ConnectExec.java:125)
at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
at
org.apache.hc.client5.http.impl.classic.ProtocolExec.execute(ProtocolExec.java:192)
at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
at
org.apache.hc.client5.http.impl.classic.HttpRequestRetryExec.execute(HttpRequestRetryExec.java:96)
at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
at
org.apache.hc.client5.http.impl.classic.ContentCompressionExec.execute(ContentCompressionExec.java:152)
at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
at
org.apache.hc.client5.http.impl.classic.RedirectExec.execute(RedirectExec.java:115)
at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51)
at
org.apache.hc.client5.http.impl.classic.InternalHttpClient.doExecute(InternalHttpClient.java:170)
at
org.apache.hc.client5.http.impl.classic.CloseableHttpClient.execute(CloseableHttpClient.java:123)
at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:268)
at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:220)
at org.apache.iceberg.rest.HTTPClient.get(HTTPClient.java:321)
at org.apache.iceberg.rest.RESTClient.get(RESTClient.java:96)
at org.apache.iceberg.rest.RESTClient.get(RESTClient.java:79)
at
org.apache.iceberg.rest.RESTTableOperations.refresh(RESTTableOperations.java:99)
at org.apache.iceberg.BaseTable.refresh(BaseTable.java:68)
at
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.planSplits(ContinuousSplitPlannerImpl.java:74)
at
org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator.discoverSplits(ContinuousIcebergEnumerator.java:118)
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:130)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
### RCA
- **Why did we saw `java.lang.IllegalStateException: Connection pool shut
down`?**
a. Because more HTTP requests were issued after the REST Catalog was closed
(which causes the underlying HTTP client to close with it)
- **Where was Catalog closed? And which component closed it?**
a.
1. Source Coordinator closed the Catalog when it is attempting to
create `ContinuousSplitPlanner`, specifically in the following lines:
```
if (scanContext.isStreaming()) {
ContinuousSplitPlanner splitPlanner =
new ContinuousSplitPlannerImpl(lazyTable(), scanContext,
planningThreadName());
...
```
Ref:
https://github.com/apache/iceberg/blob/1.2.x/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L190
2. The `lazyTable()` call will close the TableLoader after table
information is obtained, specifically in this try block:
```
private Table lazyTable() {
if (table == null) {
tableLoader.open();
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader",
e);
}
}
return table;
}
```
Ref:
https://github.com/apache/iceberg/blob/1.2.x/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L130
3. The `TableLoader#close()` call will close the underlying Catalog,
which in this instance is RESTCatalog. The `RESTCatalog#close()` call will
close the underlying HTTP client. Even though the underlying REST Catalog/HTTP
client is closed, the table reference returned by `lazyTable()` call still
holds a reference to this closed HTTP client.
- **After that, which component continues to use this closed REST
Catalog/HTTP client instance?**
a. After `ContinuousSplitPlanner` is constructed (with the reference to the
table that holds a reference to the closed HTTP client), at a later point,
`planSplits()` will be invoked to discover splits. Within
`ContinuousSplitPlannerImpl#planSplits` call, it will use the table reference
with closed underlying HTTP client to invoke `table.refresh()` (which is an
HTTP GET call for loadTable API). Invoking HTTP calls with closed HTTP client
will fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]