hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1066600385


##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -223,34 +226,37 @@ ManifestFile manifest() {
   static class ContentFileStructWithMetrics implements StructLike {
     private final StructLike fileAsStruct;
     private final MetricsUtil.ReadableMetricsStruct readableMetrics;
-    private final int expectedSize;
+    private final int columnCount;
+    private final int metricsPosition;
 
     ContentFileStructWithMetrics(
-        int expectedSize,
+        int columnCount,
+        int metricsPosition,
         StructLike fileAsStruct,
         MetricsUtil.ReadableMetricsStruct readableMetrics) {
       this.fileAsStruct = fileAsStruct;
       this.readableMetrics = readableMetrics;
-      this.expectedSize = expectedSize;
+      this.columnCount = columnCount;
+      this.metricsPosition = metricsPosition;
     }
 
     @Override
     public int size() {
-      return expectedSize;
+      return columnCount;
     }
 
     @Override
     public <T> T get(int pos, Class<T> javaClass) {
-      int lastExpectedIndex = expectedSize - 1;
-      if (pos < lastExpectedIndex) {
+      if (pos < metricsPosition) {
         return fileAsStruct.get(pos, javaClass);
-      } else if (pos == lastExpectedIndex) {
+      } else if (pos == metricsPosition) {
         return javaClass.cast(readableMetrics);
       } else {
-        throw new IllegalArgumentException(
-            String.format(
-                "Illegal position access for ContentFileStructWithMetrics: %d, 
max allowed is %d",
-                pos, lastExpectedIndex));
+        // columnCount = fileAsStruct column count + the readable metrics 
field.
+        // When pos is greater than metricsPosition, the actual position of 
the field in
+        // fileAsStruct should be
+        // subtracted by 1.

Review Comment:
   done



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -149,15 +151,28 @@ public Catalog catalog() {
     return icebergCatalog;
   }
 
-  private Namespace toNamespace(String database) {
+  /** Append a new level to the base namespace */
+  private static Namespace appendLevel(Namespace baseNamespace, String 
newLevel) {
     String[] namespace = new String[baseNamespace.levels().length + 1];
     System.arraycopy(baseNamespace.levels(), 0, namespace, 0, 
baseNamespace.levels().length);
-    namespace[baseNamespace.levels().length] = database;
+    namespace[baseNamespace.levels().length] = newLevel;
     return Namespace.of(namespace);
   }
 
   TableIdentifier toIdentifier(ObjectPath path) {
-    return TableIdentifier.of(toNamespace(path.getDatabaseName()), 
path.getObjectName());
+    String objectName = path.getObjectName();
+    List<String> tableName = Splitter.on('$').splitToList(objectName);
+
+    if (tableName.size() == 1) {
+      return TableIdentifier.of(
+          appendLevel(baseNamespace, path.getDatabaseName()), 
path.getObjectName());
+    } else if (tableName.size() == 2 && 
MetadataTableType.from(tableName.get(1)) != null) {
+      return TableIdentifier.of(
+          appendLevel(appendLevel(baseNamespace, path.getDatabaseName()), 
tableName.get(0)),
+          tableName.get(1));
+    } else {
+      throw new IllegalArgumentException("Illegal table name");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to