stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1066088176
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -149,15 +151,28 @@ public Catalog catalog() { return icebergCatalog; } - private Namespace toNamespace(String database) { + /** Append a new level to the base namespace */ + private static Namespace appendLevel(Namespace baseNamespace, String newLevel) { String[] namespace = new String[baseNamespace.levels().length + 1]; System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length); - namespace[baseNamespace.levels().length] = database; + namespace[baseNamespace.levels().length] = newLevel; return Namespace.of(namespace); } TableIdentifier toIdentifier(ObjectPath path) { - return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName()); + String objectName = path.getObjectName(); + List<String> tableName = Splitter.on('$').splitToList(objectName); + + if (tableName.size() == 1) { + return TableIdentifier.of( + appendLevel(baseNamespace, path.getDatabaseName()), path.getObjectName()); + } else if (tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null) { + return TableIdentifier.of( + appendLevel(appendLevel(baseNamespace, path.getDatabaseName()), tableName.get(0)), + tableName.get(1)); + } else { + throw new IllegalArgumentException("Illegal table name"); Review Comment: nit: include the `objectName` in the error msg -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org