SML0127 opened a new issue, #10019:
URL: https://github.com/apache/iceberg/issues/10019
### Apache Iceberg version
1.5.0 (latest release)
### Query engine
Flink
### Please describe the bug 🐞
First commit at create table is success, after commit is always fail
`org.apache.iceberg.exceptions.CommitStateUnknownException:
@hive#my_db.my_db table not found`
Flink version: 1.17.0
Iceberg version: 1.5.0
## my code (scala)
```scala
object IcebergSinkConfiguration {
val hadoopConf = new Configuration()
var catalog = new HiveCatalog()
var warehouse = ""
val catalogName = "hive"
var database = "my_db"
lazy val tableName = "my_table"
val hiveMetastoreUris = "thrift://hadoo-xxx:9083" // replaced for security
val tableProperties = JavaConverters.mapAsJavaMap(Map[String, String](
(TableProperties.FORMAT_VERSION, "2"),
(TableProperties.UPSERT_ENABLED, "true"),
(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true"),
(TableProperties.ENGINE_HIVE_ENABLED, "true")
))
val catalogProps = new java.util.HashMap[String, String]()
var tableIdentifier: TableIdentifier = null
var catalogLoader: CatalogLoader = null
var table: Table = null
var tableLoader: TableLoader = null
def apply(phase: String): Unit = {
warehouse = s"${hdfsHeadPath}warehouse"
catalogProps.put(WAREHOUSE_LOCATION, warehouse)
catalogProps.put(URI, hiveMetastoreUris)
catalogProps.put(CLIENT_POOL_SIZE, "10")
catalogProps.put("type", "iceberg")
catalogProps.put("catalog-type", "hive")
catalogProps.put("property-version", "2")
hadoopConf.set(ENGINE_HIVE_ENABLED, "true")
val namespaceProps = new util.HashMap[String, String]()
namespaceProps.put(HiveCatalog.HMS_TABLE_OWNER, "my_name")
namespaceProps.put(HiveCatalog.HMS_DB_OWNER, "my_name")
catalogLoader = CatalogLoader.hive(catalogName, hadoopConf, catalogProps)
catalog = catalogLoader.loadCatalog().asInstanceOf[HiveCatalog]
val namespace = Namespace.of(database)
tableIdentifier = TableIdentifier.of(namespace, tableName)
val table = catalog.createTable(
tableIdentifier,
getIcebergTableSchema,
PartitionSpec.unpartitioned,
warehouse + s"${database}.db/${tableName}",
tableProperties
FlinkSink.forRowData(rowDataJavaDataStream)
.tableLoader(tableLoader)
.upsert(true)
.equalityFieldColumns(JavaConverters.mutableSeqAsJavaList(pkList))
.append().setParallelism(1)
}
}
```
## error log
```d
Committing existing table: hive.my_db.my_table
...
Committing rowDelta for checkpoint 1 to table hive.my_db.my_table branch
main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=1,
dataFilesByteCount=2696, deleteFilesCount=1, deleteFilesRecordCount=1,
deleteFilesByteCount=465}
...
2024-03-22 07:53:39.004 ERROR org.apache.iceberg.hive.HiveTableOperations
[] - Cannot tell if commit to
bi_iceberg_tmp.api_server_capri_user_deregistered_user succeeded, attempting to
reconnect and check.
org.apache.iceberg.exceptions.CommitStateUnknownException: @hive#my_db.my_db
table not found
Cannot determine whether the commit was successful or not, the underlying
data files may or may not be needed. Manual intervention via the Remove Orphan
Files Action can remove these files when a connection to the Catalog can be
re-established if the commit was actually unsuccessful.
Please check to see whether or not your commit was successful before
retrying this commit. Retrying an already successful operation will result in
duplicate records or unintentional modifications.
At this time no files will be deleted including possibly unused manifest
lists.
at
org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:292)
at
org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:408)
at
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:382)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:416)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:388)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitPendingResult(IcebergFilesCommitter.java:298)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:280)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:198)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by:
InvalidOperationException(message:@hive#bi_iceberg_tmp.api_server_capri_user_deregistered_user
table not found)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result.read(ThriftHiveMetastore.java)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:2270)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:2254)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table_with_environmentContext(HiveMetaStoreClient.java:405)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)
at com.sun.proxy.$Proxy69.alter_table_with_environmentContext(Unknown
Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
at
org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
at
org.apache.iceberg.hive.MetastoreUtil.alterTable(MetastoreUtil.java:78)
at
org.apache.iceberg.hive.HiveOperationsBase.lambda$persistTable$0(HiveOperationsBase.java:112)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
at
org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
at
org.apache.iceberg.hive.HiveOperationsBase.persistTable(HiveOperationsBase.java:110)
at
org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:239)
```
## hdfs
```
hadoop fs -ls hdfs://hadoop-xx/warehouse
24/03/22 17:03:54 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
24/03/22 17:03:54 WARN shortcircuit.DomainSocketFactory: The short-circuit
local reads feature cannot be used because libhadoop cannot be loaded.
Found 2 items
drwxrwxr-x - my_name my_group 0 2024-03-22 16:53
hdfs://hadoop-xx/warehouse/my_db.db/my_table/data
drwxrwxr-x - my_name my_group 0 2024-03-22 16:53
hdfs://hadoop-xx/warehouse/my_db.db/my_table/metadata
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]