hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1066600385
########## core/src/main/java/org/apache/iceberg/BaseFilesTable.java: ########## @@ -223,34 +226,37 @@ ManifestFile manifest() { static class ContentFileStructWithMetrics implements StructLike { private final StructLike fileAsStruct; private final MetricsUtil.ReadableMetricsStruct readableMetrics; - private final int expectedSize; + private final int columnCount; + private final int metricsPosition; ContentFileStructWithMetrics( - int expectedSize, + int columnCount, + int metricsPosition, StructLike fileAsStruct, MetricsUtil.ReadableMetricsStruct readableMetrics) { this.fileAsStruct = fileAsStruct; this.readableMetrics = readableMetrics; - this.expectedSize = expectedSize; + this.columnCount = columnCount; + this.metricsPosition = metricsPosition; } @Override public int size() { - return expectedSize; + return columnCount; } @Override public <T> T get(int pos, Class<T> javaClass) { - int lastExpectedIndex = expectedSize - 1; - if (pos < lastExpectedIndex) { + if (pos < metricsPosition) { return fileAsStruct.get(pos, javaClass); - } else if (pos == lastExpectedIndex) { + } else if (pos == metricsPosition) { return javaClass.cast(readableMetrics); } else { - throw new IllegalArgumentException( - String.format( - "Illegal position access for ContentFileStructWithMetrics: %d, max allowed is %d", - pos, lastExpectedIndex)); + // columnCount = fileAsStruct column count + the readable metrics field. + // When pos is greater than metricsPosition, the actual position of the field in + // fileAsStruct should be + // subtracted by 1. Review Comment: done ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -149,15 +151,28 @@ public Catalog catalog() { return icebergCatalog; } - private Namespace toNamespace(String database) { + /** Append a new level to the base namespace */ + private static Namespace appendLevel(Namespace baseNamespace, String newLevel) { String[] namespace = new String[baseNamespace.levels().length + 1]; System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length); - namespace[baseNamespace.levels().length] = database; + namespace[baseNamespace.levels().length] = newLevel; return Namespace.of(namespace); } TableIdentifier toIdentifier(ObjectPath path) { - return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName()); + String objectName = path.getObjectName(); + List<String> tableName = Splitter.on('$').splitToList(objectName); + + if (tableName.size() == 1) { + return TableIdentifier.of( + appendLevel(baseNamespace, path.getDatabaseName()), path.getObjectName()); + } else if (tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null) { + return TableIdentifier.of( + appendLevel(appendLevel(baseNamespace, path.getDatabaseName()), tableName.get(0)), + tableName.get(1)); + } else { + throw new IllegalArgumentException("Illegal table name"); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org