[GitHub] [iceberg] InvisibleProgrammer commented on pull request #6337: Docs: Update Iceberg Hive documentation
InvisibleProgrammer commented on PR #6337: URL: https://github.com/apache/iceberg/pull/6337#issuecomment-1336973794 @pvary , Can I ask for workflow approval? Thank you, Zsolt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on pull request #6250: Docs: Remove redundant configuration from spark docs
ajantha-bhat commented on PR #6250: URL: https://github.com/apache/iceberg/pull/6250#issuecomment-1337013120 cc: @pvary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on pull request #6234: Docs: Remove parent-version-id from the view spec example
ajantha-bhat commented on PR #6234: URL: https://github.com/apache/iceberg/pull/6234#issuecomment-1337015494 cc: @stevenzwu, @jackye1995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] KarlManong opened a new issue, #6359: Sometimes org.apache.iceberg.jdbc.TestJdbcTableConcurrency#testConcurrentFastAppends never end
KarlManong opened a new issue, #6359: URL: https://github.com/apache/iceberg/issues/6359 ### Apache Iceberg version 1.1.0 (latest release) ### Query engine Other ### Please describe the bug 🐞 When I running `gradle build`, it stuck at `> :iceberg-core:test > Executing test org.apache.iceberg.jdbc.TestJdbcTableConcurrency` The stack shows [stack.txt](https://github.com/apache/iceberg/files/10152988/stack.txt) Full log [iceberg-core.log](https://github.com/apache/iceberg/files/10152998/iceberg-core.log) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] KarlManong closed issue #6359: Sometimes org.apache.iceberg.jdbc.TestJdbcTableConcurrency#testConcurrentFastAppends never end
KarlManong closed issue #6359: Sometimes org.apache.iceberg.jdbc.TestJdbcTableConcurrency#testConcurrentFastAppends never end URL: https://github.com/apache/iceberg/issues/6359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6091: Spark-3.3: Handle statistics file clean up from expireSnapshots action/procedure
ajantha-bhat commented on code in PR #6091: URL: https://github.com/apache/iceberg/pull/6091#discussion_r1039525785 ## core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java: ## @@ -1234,6 +1245,40 @@ public void testMultipleRefsAndCleanExpiredFilesFailsForIncrementalCleanup() { .commit()); } + @Test + public void testExpireWithStatisticsFiles() throws URISyntaxException, IOException { +table.newAppend().appendFile(FILE_A).commit(); +File statsFileLocation1 = statsFileLocation(table); +StatisticsFile statisticsFile1 = writeStatsFileForCurrentSnapshot(table, statsFileLocation1); +Assert.assertEquals( +"Must match the latest snapshot", +table.currentSnapshot().snapshotId(), +statisticsFile1.snapshotId()); + +table.newAppend().appendFile(FILE_B).commit(); +File statsFileLocation2 = statsFileLocation(table); +StatisticsFile statisticsFile2 = writeStatsFileForCurrentSnapshot(table, statsFileLocation2); +Assert.assertEquals( +"Must match the latest snapshot", +table.currentSnapshot().snapshotId(), +statisticsFile2.snapshotId()); + +Assert.assertEquals("Should have 2 statistics file", 2, table.statisticsFiles().size()); + +table.updateProperties().set(TableProperties.MAX_SNAPSHOT_AGE_MS, "1").commit(); + +removeSnapshots(table).commit(); + +Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots())); +Assertions.assertThat(table.statisticsFiles()) +.hasSize(1) +.extracting(StatisticsFile::snapshotId) +.as("Should contain only the statistics file of snapshot2") +.isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); +Assertions.assertThat(statsFileLocation1.exists()).isFalse(); +Assertions.assertThat(statsFileLocation2.exists()).isTrue(); Review Comment: Added now. This change was from a dependent PR (#6090). So, added the testcase in the dependent PR (#6090) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6090: Core: Handle statistics file clean up from expireSnapshots
ajantha-bhat commented on code in PR #6090: URL: https://github.com/apache/iceberg/pull/6090#discussion_r1039527780 ## core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java: ## @@ -79,4 +80,15 @@ protected void deleteFiles(Set pathsToDelete, String fileType) { (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) .run(deleteFunc::accept); } + + protected Set expiredStatisticsFilesLocations( + TableMetadata beforeExpiration, Set expiredIds) { +Set expiredStatisticsFilesLocations = Sets.newHashSet(); +for (StatisticsFile statisticsFile : beforeExpiration.statisticsFiles()) { + if (expiredIds.contains(statisticsFile.snapshotId())) { +expiredStatisticsFilesLocations.add(statisticsFile.path()); + } +} +return expiredStatisticsFilesLocations; + } Review Comment: @amogh-jahagirdar, @findepi As #6267 clarifies the spec. I have handled the reuse scenario and added the testcase now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] XN137 commented on pull request #6221: Change SingleBufferInputStream .read signature to match super-method.
XN137 commented on PR #6221: URL: https://github.com/apache/iceberg/pull/6221#issuecomment-1337298037 additional context: the failing check can only be performed when the compiled bytecode contains parameter name information for methods: https://github.com/palantir/gradle-baseline/blob/fd7759a9a37112db69f6875966e869a078a00319/baseline-error-prone/src/main/java/com/palantir/baseline/errorprone/ConsistentOverrides.java#L151-L164 so it depends on whether the jdk classes were compiled with the `-parameters` flag for `javac` or not: ``` -parameters Generate metadata for reflection on method parameters ``` on jdk 8 this flag is commonly not used: ``` ~/.sdkman/candidates/java/8.0.352-zulu/bin$ ./javap -version && ./javap -l java.io.InputStream | egrep "public.*read|off" 1.8.0_352 public abstract int read() throws java.io.IOException; public int read(byte[]) throws java.io.IOException; public int read(byte[], int, int) throws java.io.IOException; ``` on jdk 11 it is commonly used: ``` ~/.sdkman/candidates/java/11.0.17-zulu/bin$ ./javap -version && ./javap -l java.io.InputStream | egrep "public.*read|off" 11.0.17 public abstract int read() throws java.io.IOException; public int read(byte[]) throws java.io.IOException; public int read(byte[], int, int) throws java.io.IOException; 0 81 2 off I public byte[] readAllBytes() throws java.io.IOException; public byte[] readNBytes(int) throws java.io.IOException; 200 74 7 offset I public int readNBytes(byte[], int, int) throws java.io.IOException; 0 53 2 off I ``` but there are some jdk vendors (i.e. on centos7) that also enable this flag on jdk8, thus this check might fail depending on the jdk distribution used. so targeting jdk11 is just a way to always trigger the check violation. long story short, this PR should get merged to fix the check on all kind of jdks :sweat_drops: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039570176 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java: ## @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.time.Instant; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.commons.collections.ListUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFlinkMetaDataTable extends FlinkCatalogTestBase { Review Comment: Missed it 😂 , added. All the types have been tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039570407 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java: ## @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.time.Instant; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.commons.collections.ListUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFlinkMetaDataTable extends FlinkCatalogTestBase { + + private static final String TABLE_NAME = "test_table"; + private final FileFormat format = FileFormat.AVRO; + private static TemporaryFolder temp = new TemporaryFolder(); + + public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace) { +super(catalogName, baseNamespace); + } + + @Override + protected TableEnvironment getTableEnv() { +Configuration configuration = super.getTableEnv().getConfig().getConfiguration(); +configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1); +configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); +return super.getTableEnv(); + } + + @Before + public void before() { +super.before(); +sql("CREATE DATABASE %s", flinkDatabase); +sql("USE CATALOG %s", catalogName); +sql("USE %s", DATABASE); + } + + @Override + @After + public void clean() { +sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); +sql("DROP DATABASE IF EXISTS %s", flinkDatabase); +super.clean(); + } + + @Test + public void testSnapshots() { +sql( +"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('format-version'='2', 'write.format.default'='%s')", +TABLE_NAME, format.name()); +sql( +"INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", +TABLE_NAME); +String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME); +List result = sql(sql); + +Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); +Iterator snapshots = table.snapshots().iterator(); +for (Row row : result) { + Snapshot next = snapshots.next(); + Assert.assertEquals( + "Should have expected timestamp", + ((Instant) row.getField(0)).toEpochMilli(), + next.timestampMillis()); + Assert.assertEquals("Should have expected snapshot id", row.getField(1), next.snapshotId()); + Assert.assertEquals("Should have expected paren
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039570783 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java: ## @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.time.Instant; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.commons.collections.ListUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFlinkMetaDataTable extends FlinkCatalogTestBase { + + private static final String TABLE_NAME = "test_table"; + private final FileFormat format = FileFormat.AVRO; + private static TemporaryFolder temp = new TemporaryFolder(); + + public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace) { +super(catalogName, baseNamespace); + } + + @Override + protected TableEnvironment getTableEnv() { +Configuration configuration = super.getTableEnv().getConfig().getConfiguration(); +configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1); +configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); +return super.getTableEnv(); + } + + @Before + public void before() { +super.before(); +sql("CREATE DATABASE %s", flinkDatabase); +sql("USE CATALOG %s", catalogName); +sql("USE %s", DATABASE); + } + + @Override + @After + public void clean() { +sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); +sql("DROP DATABASE IF EXISTS %s", flinkDatabase); +super.clean(); + } + + @Test + public void testSnapshots() { +sql( +"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('format-version'='2', 'write.format.default'='%s')", +TABLE_NAME, format.name()); +sql( +"INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", +TABLE_NAME); +String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME); +List result = sql(sql); + +Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); +Iterator snapshots = table.snapshots().iterator(); +for (Row row : result) { + Snapshot next = snapshots.next(); + Assert.assertEquals( + "Should have expected timestamp", + ((Instant) row.getField(0)).toEpochMilli(), + next.timestampMillis()); + Assert.assertEquals("Should have expected snapshot id", row.getField(1), next.snapshotId()); + Assert.assertEquals("Should have expected paren
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039571147 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java: ## @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.time.Instant; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.commons.collections.ListUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFlinkMetaDataTable extends FlinkCatalogTestBase { + + private static final String TABLE_NAME = "test_table"; + private final FileFormat format = FileFormat.AVRO; + private static TemporaryFolder temp = new TemporaryFolder(); + + public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace) { +super(catalogName, baseNamespace); + } + + @Override + protected TableEnvironment getTableEnv() { +Configuration configuration = super.getTableEnv().getConfig().getConfiguration(); +configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1); +configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); +return super.getTableEnv(); + } + + @Before + public void before() { +super.before(); +sql("CREATE DATABASE %s", flinkDatabase); +sql("USE CATALOG %s", catalogName); +sql("USE %s", DATABASE); + } + + @Override + @After + public void clean() { +sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); +sql("DROP DATABASE IF EXISTS %s", flinkDatabase); +super.clean(); + } + + @Test + public void testSnapshots() { +sql( +"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('format-version'='2', 'write.format.default'='%s')", +TABLE_NAME, format.name()); +sql( +"INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", +TABLE_NAME); +String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME); +List result = sql(sql); + +Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); +Iterator snapshots = table.snapshots().iterator(); +for (Row row : result) { + Snapshot next = snapshots.next(); + Assert.assertEquals( + "Should have expected timestamp", + ((Instant) row.getField(0)).toEpochMilli(), + next.timestampMillis()); + Assert.assertEquals("Should have expected snapshot id", row.getField(1), next.snapshotId()); Review Comment: 😂 It has been corrected. -- Th
[GitHub] [iceberg] Fokko merged pull request #6221: Change SingleBufferInputStream .read signature to match super-method.
Fokko merged PR #6221: URL: https://github.com/apache/iceberg/pull/6221 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039674684 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039683554 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #6335: Core: Avoid generating a large ManifestFile when committing
ConeyLiu commented on code in PR #6335: URL: https://github.com/apache/iceberg/pull/6335#discussion_r1039686191 ## core/src/main/java/org/apache/iceberg/SnapshotProducer.java: ## @@ -499,6 +501,40 @@ protected long snapshotId() { return snapshotId; } + protected static > List writeFilesToManifests( + Iterable files, + Supplier> createWriter, + Long newFilesSequenceNumber, + long targetSizeBytes) + throws IOException { +List result = Lists.newArrayList(); +Iterator fileIterator = files.iterator(); +ManifestWriter writer = null; Review Comment: Updated with try-finally, while try-with-resources seems not easy to implement. > Could we initialize the writer upfront? There are some UTs failed if do that. I tried the following: ```java try { writer = createWriter.get(); while (fileIterator.hasNext()) { if (writer.length() >= targetSizeBytes) { // here could produce an empty file because the intialize size of the newly writer could be larger than targetSizeBytes writer.close(); result.add(writer.toManifestFile()); writer = createWriter.get(); } F file = fileIterator.next(); if (newFilesSequenceNumber == null) { writer.add(file); } else { writer.add(file, newFilesSequenceNumber); } } } finally { if (writer != null) { writer.close(); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on pull request #6221: Change SingleBufferInputStream .read signature to match super-method.
ajantha-bhat commented on PR #6221: URL: https://github.com/apache/iceberg/pull/6221#issuecomment-1337501228 > but there are some jdk vendors (i.e. on centos7) that also enable this flag on jdk8, thus this check might fail depending on the jdk distribution used. @XN137: Thank you so much for digging down on this and figuring out how jdk8 is also affected by this. thanks @Fokko for merging the PR and @dchristle for the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #6335: Core: Avoid generating a large ManifestFile when committing
ConeyLiu commented on code in PR #6335: URL: https://github.com/apache/iceberg/pull/6335#discussion_r1039689716 ## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ## @@ -838,9 +839,17 @@ public Object updateEvent() { } private void cleanUncommittedAppends(Set committed) { -if (cachedNewManifest != null && !committed.contains(cachedNewManifest)) { - deleteFile(cachedNewManifest.path()); - this.cachedNewManifest = null; +if (cachedNewManifests != null) { + List cleanedNewManifests = Lists.newArrayList(); + for (ManifestFile manifestFile : cachedNewManifests) { +if (!committed.contains(manifestFile)) { + deleteFile(manifestFile.path()); +} else { + cleanedNewManifests.add(manifestFile); +} + } + + this.cachedNewManifests = cleanedNewManifests; Review Comment: From the failed UTs, it seems it could call the `cleanUncommittedAppends` multiple times when retrying the commit. And we need to make sure the previous wroten manifest file is deleted. ## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ## @@ -948,13 +949,10 @@ private List newDeleteFilesAsManifests() { (specId, deleteFiles) -> { PartitionSpec spec = ops.current().spec(specId); try { - ManifestWriter writer = newDeleteManifestWriter(spec); - try { -writer.addAll(deleteFiles); - } finally { -writer.close(); - } - cachedNewDeleteManifests.add(writer.toManifestFile()); + List manifestFiles = Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039702122 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs
RussellSpitzer commented on code in PR #6250: URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039705908 ## docs/spark-getting-started.md: ## @@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for tables under `$PWD/w ```sh spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% icebergVersion %}}\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ ---conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ Review Comment: This makes line 55 Incorrect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on pull request #6335: Core: Avoid generating a large ManifestFile when committing
nastra commented on PR #6335: URL: https://github.com/apache/iceberg/pull/6335#issuecomment-1337557734 It is mentioned in the docs that `MANIFEST_TARGET_SIZE_BYTES` relates to `Target size when merging manifest files`, meaning that this setting only takes effect when merging of manifest files happens (e.g. when using `newAppend()`). Merging of manifest files would not happen when using `newFastAppend()` for example. This might explain why it seemed that this setting wouldn't take any effect in your env. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs
ajantha-bhat commented on code in PR #6250: URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039751700 ## docs/spark-getting-started.md: ## @@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for tables under `$PWD/w ```sh spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% icebergVersion %}}\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ ---conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ Review Comment: Do you mean `adds support for Iceberg tables to Spark's built-in catalog` was achieved by adding a `SparkSessionCatalog`? But then it is of type hive and URI is not mentioned. Which may confuse the users. Also `spark_catalog` was not used in any example snippets. So, do you have any suggestions here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on a diff in pull request #6353: Make sure S3 stream opened by ReadConf ctor is closed
nastra commented on code in PR #6353: URL: https://github.com/apache/iceberg/pull/6353#discussion_r1039765677 ## parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java: ## @@ -46,7 +47,7 @@ * * @param type of value to read */ -class ReadConf { +class ReadConf implements Closeable { Review Comment: what about making this implement `AutoCloseable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on a diff in pull request #6348: Python: Update license-checker
nastra commented on code in PR #6348: URL: https://github.com/apache/iceberg/pull/6348#discussion_r1039781132 ## python/dev/.rat-excludes: ## @@ -0,0 +1,2 @@ +.rat-excludes Review Comment: related to [this old comment](https://github.com/apache/iceberg/pull/5840#discussion_r978666139) I'm curious whether it would now make sense to re-use the `check-license` script from the root dev folder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039789754 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs
ajantha-bhat commented on code in PR #6250: URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039831769 ## docs/spark-getting-started.md: ## @@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for tables under `$PWD/w ```sh spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% icebergVersion %}}\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ ---conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ Review Comment: which one can be better? a) Remove "built-in catalog" mentioned from the line 55 b) or change `spark_catalog` type also to `Hadoop` c) or add some hive URI = xx in the example snippet I don't think we can leave it as it is because the catalog-type is hive but without URI will just confuse the users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs
RussellSpitzer commented on code in PR #6250: URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039836905 ## docs/spark-getting-started.md: ## @@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for tables under `$PWD/w ```sh spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% icebergVersion %}}\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ ---conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ Review Comment: "spark_catalog" is the built in catalog for Spark. The one you get without specifying anything else. This configuration would work for any user who has already defined a HMS for Spark and would like it to also work with Iceberg Tables. I think it's fine to skip this usage though since all the examples use a separate "SparkCatalog" In this case I think we should probably add some more details SparkSessionCatalog in some different section perhaps? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs
RussellSpitzer commented on code in PR #6250: URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039836905 ## docs/spark-getting-started.md: ## @@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for tables under `$PWD/w ```sh spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% icebergVersion %}}\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ ---conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ Review Comment: "spark_catalog" is the built in catalog for Spark. The one you get without specifying anything else. This configuration would work for any user who has already defined a HMS for Spark and would like it to also work with Iceberg Tables. So I would think this would be useful to most folks who are using Iceberg with an existing Spark Setup. I think it's fine to skip "spark_catalog" in this particular usage though since all the examples use a separate "local" SparkCatalog. If we remove it we should probably add some more details SparkSessionCatalog in some different section perhaps? Or maybe add another example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table
stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039841272 ## docs/flink-getting-started.md: ## @@ -712,9 +712,188 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | -## Inspecting tables. +## Inspecting tables -Iceberg does not support inspecting table in flink sql now, we need to use [iceberg's Java API](../api) to read iceberg's meta data to get those table information. +To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables. + +Metadata tables are identified by adding the metadata table name after the original table name. For example, history for `db.table` is read using `db.table$history`. + +### History + +To show table history: + +```sql +SELECT * FROM prod.db.table$history; +``` + +| made_current_at | snapshot_id | parent_id | is_current_ancestor | +| --- | --- | --- | --- | +| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL| true | +| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true | +| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false | +| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true | +| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true | +| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true | + +{{< hint info >}} +**This shows a commit that was rolled back.** The example has two snapshots with the same parent, and one is *not* an ancestor of the current table state. Review Comment: maybe more explicit? ``` In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is *not* an ancestor of the current table state. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs
ajantha-bhat commented on code in PR #6250: URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039848442 ## docs/spark-getting-started.md: ## @@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for tables under `$PWD/w ```sh spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% icebergVersion %}}\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ ---conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ Review Comment: https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration already talks about the `SparkSessionCatalog`. So, we can remove it from here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read
pvary commented on code in PR #6299: URL: https://github.com/apache/iceberg/pull/6299#discussion_r1039860822 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.enumerator; + +import javax.annotation.concurrent.ThreadSafe; +import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue; + +/** + * This enumeration history is used for split discovery throttling. It wraps Guava {@link + * EvictingQueue} to provide thread safety. + */ +@ThreadSafe +class EnumerationHistory { + + // EvictingQueue is not thread safe. + private final EvictingQueue enumerationSplitCountHistory; + + EnumerationHistory(int maxHistorySize) { +this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize); + } + + /** Add the split count from the last enumeration result. */ + synchronized void add(int splitCount) { +enumerationSplitCountHistory.add(splitCount); + } + + /** @return true if split discovery should pause because assigner has too many splits already. */ + synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) { +if (enumerationSplitCountHistory.remainingCapacity() > 0) { + // only check throttling when full history is obtained. Review Comment: Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat opened a new pull request, #6360: Docs: Update Zorder spark support versions.
ajantha-bhat opened a new pull request, #6360: URL: https://github.com/apache/iceberg/pull/6360 Some users are using Zorder with spark-3.1 and facing a confusing error message. Hence, updating the document. Also thought about updating the code to throw the unsupported exception. But maybe not required to modify the code as it is only the older versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039891570 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +public class AesGcmInputFile implements InputFile { Review Comment: You're right. This would be the result of the `decrypt` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039893751 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +public class AesGcmInputFile implements InputFile { + private final InputFile sourceFile; + private final byte[] dataKey; + private long plaintextLength; + + public AesGcmInputFile(InputFile sourceFile, byte[] dataKey) { +this.sourceFile = sourceFile; +this.dataKey = dataKey; +this.plaintextLength = -1; + } + + @Override + public long getLength() { +if (plaintextLength == -1) { + try { +this.newStream().close(); Review Comment: We should be able to know the length without opening the stream, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue merged pull request #4925: API: Add view interfaces
rdblue merged PR #4925: URL: https://github.com/apache/iceberg/pull/4925 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on pull request #4925: API: Add view interfaces
rdblue commented on PR #4925: URL: https://github.com/apache/iceberg/pull/4925#issuecomment-1337860939 Merge! Let me know where the implementation PR is and I'll start looking at that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039912960 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; Review Comment: Can we call this a header? We use `prefix` in several places and I think it would be more clear to call it a header since it is only at the start of the file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6360: Docs: Update Zorder spark support versions.
RussellSpitzer commented on code in PR #6360: URL: https://github.com/apache/iceberg/pull/6360#discussion_r1039913771 ## docs/spark-procedures.md: ## @@ -271,7 +271,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile |---|---|--|-| | `table` | ✔️ | string | Name of the table to update | | `strategy`|| string | Name of the strategy - binpack or sort. Defaults to binpack strategy | -| `sort_order` || string | If Zorder, then comma separated column names within zorder() text. Example: zorder(c1,c2,c3). Else, Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST | +| `sort_order` || string | If Zorder(supported from Spark-3.2 and above), then comma separated column names within zorder() text. Example: zorder(c1,c2,c3). Else, Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST | Review Comment: I think either (Supported in Spark 3.2 and above) or (Supported from Spark 3.2) but the mix of "from" and "and above" is a little confusing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho closed issue #4362: Expose human-readable metrics in metadata tables
szehon-ho closed issue #4362: Expose human-readable metrics in metadata tables URL: https://github.com/apache/iceberg/issues/4362 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho merged pull request #5376: Core: Add readable metrics columns to files metadata tables
szehon-ho merged PR #5376: URL: https://github.com/apache/iceberg/pull/5376 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on pull request #5376: Core: Add readable metrics columns to files metadata tables
szehon-ho commented on PR #5376: URL: https://github.com/apache/iceberg/pull/5376#issuecomment-1337877894 Thanks @RussellSpitzer @aokolnychyi @chenjunjiedada for detailed reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039937969 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; +int fetched = sourceStream.read(prefixBytes); +Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH, +"Insufficient read " + fetched + +". The stream length should be at least " + Ciphers.GCM_STREAM_PREFIX_LENGTH); + +byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length]; +System.arraycopy(prefixBytes, 0, magic, 0, Ciphers.GCM_STREAM_MAGIC_ARRAY.length); +Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, magic), +"Cannot open encrypted file, it does not begin with magic string " + Ciphers.GCM_STREAM_MAGIC_STRING); + +if (!emptyCipherStream) { + this.plainStreamPosition = 0; + this.fileAadPrefix = fileAadPrefix; + gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey); + plainBlockSize = ByteBuffer.wrap(prefixBytes, Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4) + .order(ByteOrder.LITTLE_ENDIAN).getInt(); + Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + plainBlockSize); + + cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + this.ciphertextBlockBuffer = new byte[cipherBlockSize]; + this.currentBlockIndex = 0; + this.currentOffsetInPlainBlock = 0; + + int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / cipherBlockSize); + int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - numberOfFullBlocks * cipherBlockSize); + boolean fullBlocksOnly = (0 == cipherBytesInLastBlock); + numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : numberOfFullBlocks + 1; + lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : cipherBytesInLastBlock; // never 0 + int plainBytesInLastBlock = fullBlocksOnly ? 0 : + (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - Ciphers.GCM_TAG_LENGTH); + plainStreamSize = numberOfFullBlocks * plainBlockSize + plainBytesInLastBlock; +} else { + plainStreamSize = 0; + + gcmDecryptor = null; + ciphertextBlockBuffer = null; + cipherBlockSize = -1; + plainBlockSize = -1; + numberOfBlocks = -1; + lastCipherBlockSize = -1; + this.fileAadPrefix = null; +} + } + + public long plaintextStreamSize() { +return plainStreamSize; + } + + @Override + public int available() throws IOException { +long maxAvailable = plainStreamSize - plainStreamPositio
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039939654 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; +int fetched = sourceStream.read(prefixBytes); +Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH, +"Insufficient read " + fetched + +". The stream length should be at least " + Ciphers.GCM_STREAM_PREFIX_LENGTH); + +byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length]; +System.arraycopy(prefixBytes, 0, magic, 0, Ciphers.GCM_STREAM_MAGIC_ARRAY.length); +Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, magic), +"Cannot open encrypted file, it does not begin with magic string " + Ciphers.GCM_STREAM_MAGIC_STRING); + +if (!emptyCipherStream) { + this.plainStreamPosition = 0; + this.fileAadPrefix = fileAadPrefix; + gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey); + plainBlockSize = ByteBuffer.wrap(prefixBytes, Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4) + .order(ByteOrder.LITTLE_ENDIAN).getInt(); + Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + plainBlockSize); + + cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + this.ciphertextBlockBuffer = new byte[cipherBlockSize]; + this.currentBlockIndex = 0; + this.currentOffsetInPlainBlock = 0; + + int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / cipherBlockSize); + int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - numberOfFullBlocks * cipherBlockSize); + boolean fullBlocksOnly = (0 == cipherBytesInLastBlock); + numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : numberOfFullBlocks + 1; + lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : cipherBytesInLastBlock; // never 0 + int plainBytesInLastBlock = fullBlocksOnly ? 0 : + (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - Ciphers.GCM_TAG_LENGTH); + plainStreamSize = numberOfFullBlocks * plainBlockSize + plainBytesInLastBlock; +} else { + plainStreamSize = 0; + + gcmDecryptor = null; + ciphertextBlockBuffer = null; + cipherBlockSize = -1; + plainBlockSize = -1; + numberOfBlocks = -1; + lastCipherBlockSize = -1; + this.fileAadPrefix = null; +} + } + + public long plaintextStreamSize() { +return plainStreamSize; + } + + @Override + public int available() throws IOException { +long maxAvailable = plainStreamSize - plainStreamPositio
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039941138 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; +int fetched = sourceStream.read(prefixBytes); +Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH, +"Insufficient read " + fetched + +". The stream length should be at least " + Ciphers.GCM_STREAM_PREFIX_LENGTH); + +byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length]; +System.arraycopy(prefixBytes, 0, magic, 0, Ciphers.GCM_STREAM_MAGIC_ARRAY.length); +Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, magic), +"Cannot open encrypted file, it does not begin with magic string " + Ciphers.GCM_STREAM_MAGIC_STRING); + +if (!emptyCipherStream) { + this.plainStreamPosition = 0; + this.fileAadPrefix = fileAadPrefix; + gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey); Review Comment: Pass AAD? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039944476 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; +int fetched = sourceStream.read(prefixBytes); +Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH, +"Insufficient read " + fetched + +". The stream length should be at least " + Ciphers.GCM_STREAM_PREFIX_LENGTH); + +byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length]; +System.arraycopy(prefixBytes, 0, magic, 0, Ciphers.GCM_STREAM_MAGIC_ARRAY.length); +Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, magic), +"Cannot open encrypted file, it does not begin with magic string " + Ciphers.GCM_STREAM_MAGIC_STRING); + +if (!emptyCipherStream) { + this.plainStreamPosition = 0; + this.fileAadPrefix = fileAadPrefix; + gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey); + plainBlockSize = ByteBuffer.wrap(prefixBytes, Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4) + .order(ByteOrder.LITTLE_ENDIAN).getInt(); + Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + plainBlockSize); + + cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + this.ciphertextBlockBuffer = new byte[cipherBlockSize]; + this.currentBlockIndex = 0; + this.currentOffsetInPlainBlock = 0; + + int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / cipherBlockSize); + int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - numberOfFullBlocks * cipherBlockSize); + boolean fullBlocksOnly = (0 == cipherBytesInLastBlock); + numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : numberOfFullBlocks + 1; + lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : cipherBytesInLastBlock; // never 0 + int plainBytesInLastBlock = fullBlocksOnly ? 0 : + (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - Ciphers.GCM_TAG_LENGTH); Review Comment: I thought this would be the block size, but it looks like the encrypted block size includes the nonce and tag. I think that's a bit confusing and we should make sure the behavior is specified in the spec. Since we're planning on making the block size constant, that works well. We can have a constant for this so it's easy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-uns
[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream
rdblue commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039945676 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); Review Comment: If the unencrypted file length is 0, we should return an in-memory input stream with 0 bytes instead of a decrypting one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] InvisibleProgrammer closed pull request #6337: Docs: Update Iceberg Hive documentation
InvisibleProgrammer closed pull request #6337: Docs: Update Iceberg Hive documentation URL: https://github.com/apache/iceberg/pull/6337 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] InvisibleProgrammer commented on pull request #6337: Docs: Update Iceberg Hive documentation
InvisibleProgrammer commented on PR #6337: URL: https://github.com/apache/iceberg/pull/6337#issuecomment-1337926448 Hi, @pvary ! First of all, thank you for the review and approval. Unfortunately, I just learned the `Close` button on an approved PR doesn't mean close and merge, it just closes. So I reopened it. I assume, as a first-time committer, I have no permission to merge. Could you please merge it? Thank you, Zsolt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] InvisibleProgrammer commented on pull request #6337: Docs: Update Iceberg Hive documentation
InvisibleProgrammer commented on PR #6337: URL: https://github.com/apache/iceberg/pull/6337#issuecomment-1337938679 Hi, @pvary ! First of all, thank you for the review and approval. Unfortunately, I just learned the `Close` button on an approved PR doesn't mean close and merge, it just closes. So I reopened it. I assume, as a first-time committer, I have no permission to merge. Could you please merge it? Thank you, Zsolt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] tprelle commented on pull request #6327: ORC: Fix error when projecting nested indentity partition column
tprelle commented on PR #6327: URL: https://github.com/apache/iceberg/pull/6327#issuecomment-1338004394 hi @shardulm94, it's seems less intrusive and better than https://github.com/apache/iceberg/pull/4599 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new issue, #6361: Python: Ignore home folder when running tests
Fokko opened a new issue, #6361: URL: https://github.com/apache/iceberg/issues/6361 ### Feature Request / Improvement When you run tests on your local machine, and you have a `~/.pyiceberg.yaml` around, there is a possibility that the `test_missing_uri` will fail because it will pick up the URI from the configuration. Instead, for the tests, we could create a fixture that will create a temporary directory, and point the home directory to that freshly created folder. This way the tests are more isolated from the environment it runs in. ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new pull request, #6362: Python: Fix PyArrow import
Fokko opened a new pull request, #6362: URL: https://github.com/apache/iceberg/pull/6362 Tested this in a fresh docker container: ``` ➜ python git:(fd-fix-pyarrow-import) docker run -v `pwd`:/vo/ -t -i python:3.9 bash root@1252c09f932c:/vo# cd /vo/ root@1252c09f932c:/vo# pip3 install -e ".[s3fs]" Obtaining file:///vo Installing build dependencies ... done Checking if build backend supports build_editable ... done Getting requirements to build editable ... done Preparing editable metadata (pyproject.toml) ... done Collecting mmhash3==3.0.1 Downloading mmhash3-3.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (38 kB) Collecting pyyaml==6.0.0 Downloading PyYAML-6.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (731 kB) 731.1/731.1 KB 11.0 MB/s eta 0:00:00 Collecting rich==12.6.0 Downloading rich-12.6.0-py3-none-any.whl (237 kB) 237.5/237.5 KB 9.7 MB/s eta 0:00:00 Requirement already satisfied: requests==2.28.1 in /usr/local/lib/python3.9/site-packages (from pyiceberg==0.2.0) (2.28.1) Collecting pydantic==1.10.2 Downloading pydantic-1.10.2-py3-none-any.whl (154 kB) 154.6/154.6 KB 11.9 MB/s eta 0:00:00 Collecting zstandard==0.19.0 Downloading zstandard-0.19.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (2.3 MB) 2.3/2.3 MB 18.3 MB/s eta 0:00:00 Collecting fsspec==2022.10.0 Downloading fsspec-2022.10.0-py3-none-any.whl (138 kB) 138.8/138.8 KB 8.7 MB/s eta 0:00:00 Collecting click==8.1.3 Downloading click-8.1.3-py3-none-any.whl (96 kB) 96.6/96.6 KB 13.8 MB/s eta 0:00:00 Collecting s3fs==2022.10.0 Downloading s3fs-2022.10.0-py3-none-any.whl (27 kB) Collecting typing-extensions>=4.1.0 Downloading typing_extensions-4.4.0-py3-none-any.whl (26 kB) Requirement already satisfied: idna<4,>=2.5 in /usr/local/lib/python3.9/site-packages (from requests==2.28.1->pyiceberg==0.2.0) (3.4) Requirement already satisfied: charset-normalizer<3,>=2 in /usr/local/lib/python3.9/site-packages (from requests==2.28.1->pyiceberg==0.2.0) (2.1.1) Requirement already satisfied: urllib3<1.27,>=1.21.1 in /usr/local/lib/python3.9/site-packages (from requests==2.28.1->pyiceberg==0.2.0) (1.26.13) Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python3.9/site-packages (from requests==2.28.1->pyiceberg==0.2.0) (2022.9.24) Collecting pygments<3.0.0,>=2.6.0 Downloading Pygments-2.13.0-py3-none-any.whl (1.1 MB) 1.1/1.1 MB 18.6 MB/s eta 0:00:00 Collecting commonmark<0.10.0,>=0.9.0 Downloading commonmark-0.9.1-py2.py3-none-any.whl (51 kB) 51.1/51.1 KB 11.3 MB/s eta 0:00:00 Collecting aiohttp!=4.0.0a0,!=4.0.0a1 Downloading aiohttp-3.8.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.0 MB) 1.0/1.0 MB 18.1 MB/s eta 0:00:00 Collecting aiobotocore~=2.4.0 Downloading aiobotocore-2.4.1-py3-none-any.whl (66 kB) 66.8/66.8 KB 9.0 MB/s eta 0:00:00 Collecting botocore<1.27.60,>=1.27.59 Downloading botocore-1.27.59-py3-none-any.whl (9.1 MB) 9.1/9.1 MB 9.3 MB/s eta 0:00:00 Collecting wrapt>=1.10.10 Downloading wrapt-1.14.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (77 kB) 77.9/77.9 KB 6.4 MB/s eta 0:00:00 Collecting aioitertools>=0.5.1 Downloading aioitertools-0.11.0-py3-none-any.whl (23 kB) Collecting aiosignal>=1.1.2 Downloading aiosignal-1.3.1-py3-none-any.whl (7.6 kB) Collecting frozenlist>=1.1.1 Downloading frozenlist-1.3.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (159 kB) 159.2/159.2 KB 9.0 MB/s eta 0:00:00 Collecting async-timeout<5.0,>=4.0.0a3 Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB) Collecting yarl<2.0,>=1.0 Downloading yarl-1.8.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (261 kB) 261.1/261.1 KB 15.8 MB/s eta 0:00:00 Requirement already satisfied: attrs>=17.3.0 in /usr/local/lib/python3.9/site-packages (from aiohttp!=4.0.0a0,!=4.0.0a1->s3fs==2022.10.0->pyiceberg==0.2.0) (22.1.0) Collecting multidict<7.0,>=4.5 Downloading multidict-6.0.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (116 kB) 116.3/116.3 KB 16.2 MB/
issues@iceberg.apache.org
danielcweeks commented on code in PR #6324: URL: https://github.com/apache/iceberg/pull/6324#discussion_r1040055218 ## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java: ## @@ -567,8 +569,13 @@ Database convertToDatabase(Namespace namespace, Map meta) { }); if (database.getOwnerName() == null) { - database.setOwnerName(System.getProperty("user.name")); - database.setOwnerType(PrincipalType.USER); + try { + database.setOwnerName(UserGroupInformation.getCurrentUser().getUserName()); +database.setOwnerType(PrincipalType.USER); + } catch (IOException e) { +throw new RuntimeException( Review Comment: Looking through the `UGI` path, it looks like the only likely failure is that security (e.g. kerberos) is enabled, but something is misconfigured or the user isn't logged in. It probably makes sense to just throw here, but we should use the java built-in `UncheckedIOException` since we have an IO cause (Iceberg's `RuntimeIOException` is probably best for when we don't have an IO cause). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu merged pull request #6299: Flink: support split discovery throttling for streaming read
stevenzwu merged PR #6299: URL: https://github.com/apache/iceberg/pull/6299 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6299: Flink: support split discovery throttling for streaming read
stevenzwu commented on PR #6299: URL: https://github.com/apache/iceberg/pull/6299#issuecomment-1338169362 Thanks @pvary and @hililiwei for the review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6360: Docs: Update Zorder spark support versions.
RussellSpitzer commented on code in PR #6360: URL: https://github.com/apache/iceberg/pull/6360#discussion_r1040172058 ## docs/spark-procedures.md: ## @@ -271,7 +271,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile |---|---|--|-| | `table` | ✔️ | string | Name of the table to update | | `strategy`|| string | Name of the strategy - binpack or sort. Defaults to binpack strategy | -| `sort_order` || string | If Zorder, then comma separated column names within zorder() text. Example: zorder(c1,c2,c3). Else, Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST | +| `sort_order` || string | If Zorder(supported in Spark 3.2 and above), then comma separated column names within zorder() text. Example: zorder(c1,c2,c3). Else, Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST | Review Comment: One more comment here, instead of "if Zorder", maybe "For Zorder use a comma separated list of columns within zorder(). (Supported in Spark 3.2 and Above) Example: " "Else, Comma separated sort orders in the format (ColumnName SortDirection NullOrder) where Defaults to the table's sort order." Just reading over the whole doc it is a little confusing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu opened a new pull request, #6363: Flink: backport split discovery throttling for FLIP-27 source to 1.14…
stevenzwu opened a new pull request, #6363: URL: https://github.com/apache/iceberg/pull/6363 … and 1.15 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source
stevenzwu commented on code in PR #5967: URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040195457 ## docs/flink-getting-started.md: ## @@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream"); OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields. {{< /hint >}} -## Write options +## Options +### Read options + +Flink read options are passed when configuring the Flink IcebergSource, like this: + +``` +IcebergSource.forRowData() +.tableLoader(TableLoader.fromCatalog(...)) +.assignerFactory(new SimpleSplitAssignerFactory()) +.streaming(true) + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) +.startSnapshotId(3821550127947089987L) +.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") +.build() +``` +For Flink SQL, read options can be passed in via SQL hints like this: +``` +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ +... +``` + +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. + +``` +env.getConfig() +.getConfiguration() +.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L); +... +``` + +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. + +| Read option | Flink configuration | Table property | Default | Description | +| - | - | | -- | | +| snapshot-id | N/A | N/A | N/A | For time travel in batch mode. Read data from the specified snapshot-id. | Review Comment: Default should be `null` (not `N/A`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source
stevenzwu commented on code in PR #5967: URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040196996 ## docs/flink-getting-started.md: ## @@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream"); OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields. {{< /hint >}} -## Write options +## Options +### Read options + +Flink read options are passed when configuring the Flink IcebergSource, like this: + +``` +IcebergSource.forRowData() +.tableLoader(TableLoader.fromCatalog(...)) +.assignerFactory(new SimpleSplitAssignerFactory()) +.streaming(true) + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) +.startSnapshotId(3821550127947089987L) +.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") +.build() +``` +For Flink SQL, read options can be passed in via SQL hints like this: +``` +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ +... +``` + +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. + +``` +env.getConfig() +.getConfiguration() +.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L); +... +``` + +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. + +| Read option | Flink configuration | Table property | Default | Description | +| - | - | | -- | | +| snapshot-id | N/A | N/A | N/A | For time travel in batch mode. Read data from the specified snapshot-id. | +| case-sensitive| case-sensitive | N/A | false | If true, match column name in a case sensitive way. | +| as-of-timestamp | N/A | N/A | N/A | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. | +| connector.iceberg.starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after th e timestamp. Just for FIP27 Source. | +| start-snapshot-timestamp | N/A | N/A | N/A | Start to read data from the most recent snapshot as of the given time in milliseconds. | +| start-snapshot-id | N/A | N/A | N/A | Start to read data from the specified snapshot-id. | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | +| connector.iceberg.split-size | connector.iceberg.split-size | read.split.target-size | Table read.split.target-size | Target size when combining data input splits.| Review Comment: hint option shouldn't need the prefix for consistency, as there is no naming collision concern. Default should be the default value from `ScanContext`, not `Table read.split.target-size ` -- This is an automated message from the Apache Git Service. To respond to the message, please lo
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source
stevenzwu commented on code in PR #5967: URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040196996 ## docs/flink-getting-started.md: ## @@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream"); OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields. {{< /hint >}} -## Write options +## Options +### Read options + +Flink read options are passed when configuring the Flink IcebergSource, like this: + +``` +IcebergSource.forRowData() +.tableLoader(TableLoader.fromCatalog(...)) +.assignerFactory(new SimpleSplitAssignerFactory()) +.streaming(true) + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) +.startSnapshotId(3821550127947089987L) +.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") +.build() +``` +For Flink SQL, read options can be passed in via SQL hints like this: +``` +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ +... +``` + +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. + +``` +env.getConfig() +.getConfiguration() +.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L); +... +``` + +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. + +| Read option | Flink configuration | Table property | Default | Description | +| - | - | | -- | | +| snapshot-id | N/A | N/A | N/A | For time travel in batch mode. Read data from the specified snapshot-id. | +| case-sensitive| case-sensitive | N/A | false | If true, match column name in a case sensitive way. | +| as-of-timestamp | N/A | N/A | N/A | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. | +| connector.iceberg.starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after th e timestamp. Just for FIP27 Source. | +| start-snapshot-timestamp | N/A | N/A | N/A | Start to read data from the most recent snapshot as of the given time in milliseconds. | +| start-snapshot-id | N/A | N/A | N/A | Start to read data from the specified snapshot-id. | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | +| connector.iceberg.split-size | connector.iceberg.split-size | read.split.target-size | Table read.split.target-size | Target size when combining data input splits.| Review Comment: hint option shouldn't need the prefix for consistency, as there is no naming collision concern. Default should be the default value from `ScanContext`, not `Table read.split.target-size `. nit: `combining data input splits` -> `combining input splits`. -- This is an automated m
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source
stevenzwu commented on code in PR #5967: URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040196996 ## docs/flink-getting-started.md: ## @@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream"); OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields. {{< /hint >}} -## Write options +## Options +### Read options + +Flink read options are passed when configuring the Flink IcebergSource, like this: + +``` +IcebergSource.forRowData() +.tableLoader(TableLoader.fromCatalog(...)) +.assignerFactory(new SimpleSplitAssignerFactory()) +.streaming(true) + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) +.startSnapshotId(3821550127947089987L) +.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") +.build() +``` +For Flink SQL, read options can be passed in via SQL hints like this: +``` +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ +... +``` + +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. + +``` +env.getConfig() +.getConfiguration() +.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L); +... +``` + +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. + +| Read option | Flink configuration | Table property | Default | Description | +| - | - | | -- | | +| snapshot-id | N/A | N/A | N/A | For time travel in batch mode. Read data from the specified snapshot-id. | +| case-sensitive| case-sensitive | N/A | false | If true, match column name in a case sensitive way. | +| as-of-timestamp | N/A | N/A | N/A | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. | +| connector.iceberg.starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after th e timestamp. Just for FIP27 Source. | +| start-snapshot-timestamp | N/A | N/A | N/A | Start to read data from the most recent snapshot as of the given time in milliseconds. | +| start-snapshot-id | N/A | N/A | N/A | Start to read data from the specified snapshot-id. | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | +| connector.iceberg.split-size | connector.iceberg.split-size | read.split.target-size | Table read.split.target-size | Target size when combining data input splits.| Review Comment: hint option shouldn't need the prefix for consistency, as there is no naming collision concern. Yes, it means that we need to handle the difference in code. Default should be the default value from `ScanContext`, not `Table read.split.target-size `. nit: `combining data input split
[GitHub] [iceberg] github-actions[bot] closed issue #4822: ParallelIterator is using too much memory
github-actions[bot] closed issue #4822: ParallelIterator is using too much memory URL: https://github.com/apache/iceberg/issues/4822 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #4822: ParallelIterator is using too much memory
github-actions[bot] commented on issue #4822: URL: https://github.com/apache/iceberg/issues/4822#issuecomment-1338428008 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source
stevenzwu commented on code in PR #5967: URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040205639 ## docs/flink-getting-started.md: ## @@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream"); OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields. {{< /hint >}} -## Write options +## Options +### Read options + +Flink read options are passed when configuring the Flink IcebergSource, like this: + +``` +IcebergSource.forRowData() +.tableLoader(TableLoader.fromCatalog(...)) +.assignerFactory(new SimpleSplitAssignerFactory()) +.streaming(true) + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) +.startSnapshotId(3821550127947089987L) +.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") +.build() +``` +For Flink SQL, read options can be passed in via SQL hints like this: +``` +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ +... +``` + +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. + +``` +env.getConfig() +.getConfiguration() +.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L); +... +``` + +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. + +| Read option | Flink configuration | Table property | Default | Description | +| - | - | | -- | | +| snapshot-id | N/A | N/A | N/A | For time travel in batch mode. Read data from the specified snapshot-id. | +| case-sensitive| case-sensitive | N/A | false | If true, match column name in a case sensitive way. | +| as-of-timestamp | N/A | N/A | N/A | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. | +| connector.iceberg.starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after th e timestamp. Just for FIP27 Source. | +| start-snapshot-timestamp | N/A | N/A | N/A | Start to read data from the most recent snapshot as of the given time in milliseconds. | +| start-snapshot-id | N/A | N/A | N/A | Start to read data from the specified snapshot-id. | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | +| connector.iceberg.split-size | connector.iceberg.split-size | read.split.target-size | Table read.split.target-size | Target size when combining data input splits.| +| connector.iceberg.split-lookback | connector.iceberg.split-file-open-cost| read.split.planning-lookback | Table read.split.planning-lookback | Number of bins to consider when combining input splits. | +| connector.iceberg.split-file-open-cost| connector.iceberg.split-file-open
[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6352: AWS: Fix inconsistent behavior of naming S3 location between read and write operations by allowing only s3 bucket name
amogh-jahagirdar commented on code in PR #6352: URL: https://github.com/apache/iceberg/pull/6352#discussion_r1040279885 ## aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java: ## @@ -74,17 +74,14 @@ class S3URI { this.scheme = schemeSplit[0]; String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); -ValidationException.check( -authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: %s", location); -ValidationException.check( -!authoritySplit[1].trim().isEmpty(), "Invalid S3 URI, path is empty: %s", location); + Review Comment: Never realized this class was package private. So that does give us the flexibility to change the behavior without impacting users. I think it makes sense then, AmazonS3URI in the AWS SDK has the same semantics for allowing bucket by itself as a URI. I do think we should confirm with @danielcweeks on if this was intentionally implemented this way in Iceberg. ## aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java: ## @@ -74,17 +74,14 @@ class S3URI { this.scheme = schemeSplit[0]; String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); -ValidationException.check( -authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: %s", location); -ValidationException.check( -!authoritySplit[1].trim().isEmpty(), "Invalid S3 URI, path is empty: %s", location); + this.bucket = bucketToAccessPointMapping == null ? authoritySplit[0] : bucketToAccessPointMapping.getOrDefault(authoritySplit[0], authoritySplit[0]); // Strip query and fragment if they exist -String path = authoritySplit[1]; +String path = authoritySplit.length > 1 ? authoritySplit[1] : ""; Review Comment: Nit: If we go down this route, could we have key be an optional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6352: AWS: Fix inconsistent behavior of naming S3 location between read and write operations by allowing only s3 bucket name
amogh-jahagirdar commented on code in PR #6352: URL: https://github.com/apache/iceberg/pull/6352#discussion_r1040280304 ## aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java: ## @@ -74,17 +74,14 @@ class S3URI { this.scheme = schemeSplit[0]; String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); -ValidationException.check( -authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: %s", location); -ValidationException.check( -!authoritySplit[1].trim().isEmpty(), "Invalid S3 URI, path is empty: %s", location); + this.bucket = bucketToAccessPointMapping == null ? authoritySplit[0] : bucketToAccessPointMapping.getOrDefault(authoritySplit[0], authoritySplit[0]); // Strip query and fragment if they exist -String path = authoritySplit[1]; +String path = authoritySplit.length > 1 ? authoritySplit[1] : ""; Review Comment: Nit: If we go down this route, could we have keys/paths be an optional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table
stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039841272 ## docs/flink-getting-started.md: ## @@ -712,9 +712,188 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | -## Inspecting tables. +## Inspecting tables -Iceberg does not support inspecting table in flink sql now, we need to use [iceberg's Java API](../api) to read iceberg's meta data to get those table information. +To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables. + +Metadata tables are identified by adding the metadata table name after the original table name. For example, history for `db.table` is read using `db.table$history`. + +### History + +To show table history: + +```sql +SELECT * FROM prod.db.table$history; +``` + +| made_current_at | snapshot_id | parent_id | is_current_ancestor | +| --- | --- | --- | --- | +| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL| true | +| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true | +| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false | +| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true | +| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true | +| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true | + +{{< hint info >}} +**This shows a commit that was rolled back.** The example has two snapshots with the same parent, and one is *not* an ancestor of the current table state. Review Comment: maybe more explicit? In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is *not* an ancestor of the current table state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table
stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040359570 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table
stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040360338 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table
stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040359570 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] rbalamohan opened a new issue, #6364: Optimise POS reads
rbalamohan opened a new issue, #6364: URL: https://github.com/apache/iceberg/issues/6364 ### Apache Iceberg version 0.14.1 ### Query engine Spark ### Please describe the bug 🐞 Currently combinedFileTask can have more than 1 file. Depending on the nature of workload, it can even have 30-50+ files in single split. When there are 4+ POS files, it takes lot longer time to process "select" queries. This is due to the fact, that every file needs to process POS file and it leads to read amplification. Request is to optimise the way POS file reading is done. - Optimise parquet reader with cached filestatus and footer - Optimise within combinedFileTask in a single task in a single executor. This can have more than 1 file in single split. Typically there can 10-50+ files depending on the size of the files. - For simplicity, let us start with 1 POS file. This POS file can have delete information about all the 50+ files in the combined task - Currently, for every file it opens, it needs "delete row positions". So it invokes "DeleteFilter::deletedRowPositions". This opens the POS file, reads the footer and reads the snippet for specific file path. - Above step happens for all the 50+ files in sequential order. - Internally, it opens and reads the footer information 50+ times which is not needed. - Need a lightweight parquet reader, which can accept readerConfs etc and take up footer information as argument. Basically cache footer details, file status details to reduce turn around with object stores. - Otherwise pass the POS reader during data reading, such that it doesn't need to reopen and read the footers again. - Optimise on reading POS - Though path is dictionary encoded, it ends up materializing the path again and again. Need a way to optimise this to reduce CPU burn when reading POS files. - Covered in https://github.com/apache/iceberg/issues/5863 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6222: Flink: Support inspecting table
stevenzwu commented on PR #6222: URL: https://github.com/apache/iceberg/pull/6222#issuecomment-1338683010 @hililiwei we should add comprehensive unit test for `StructRowData`. I have some internal DataGenerators for unit test code with very comprehensive coverage all field types (including complex nested types). Maybe I will submit a separate PR for that, which will cover Flink `RowData` and Iceberg `GenericRecord`. You can expand it with support for Iceberg `StructLike`. With that, we can write unit test/assertions compare the actual and expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table
stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040393766 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List recs, List rows) { Review Comment: why do we need the helper methods with Avro generic record? ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanc
[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #6313: Flink: use correct metric config for position deletes
chenjunjiedada commented on code in PR #6313: URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040420644 ## core/src/main/java/org/apache/iceberg/MetricsConfig.java: ## @@ -107,6 +107,30 @@ public static MetricsConfig forPositionDelete(Table table) { return new MetricsConfig(columnModes.build(), defaultMode); } + /** + * Creates a metrics config for a position delete file. + * + * @param props table configuration + */ + public static MetricsConfig forPositionDelete(Map props) { Review Comment: @stevenzwu I recheck the metric config, the schema and sort order are for the data row. Change to use `forTable` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #6313: Flink: use correct metric config for position deletes
chenjunjiedada commented on code in PR #6313: URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040421576 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java: ## @@ -160,7 +184,8 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); -MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); +MetricsConfig metricsConfig = +table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); Review Comment: The data row in equality delete should share the same metrics config as the table, so I change this as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6313: Flink: use correct metric config for position deletes
stevenzwu commented on code in PR #6313: URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040423889 ## core/src/main/java/org/apache/iceberg/MetricsConfig.java: ## @@ -107,6 +107,30 @@ public static MetricsConfig forPositionDelete(Table table) { return new MetricsConfig(columnModes.build(), defaultMode); } + /** + * Creates a metrics config for a position delete file. + * + * @param props table configuration + */ + public static MetricsConfig forPositionDelete(Map props) { Review Comment: @chenjunjiedada hmm. I think we should revert the last commit. if we are going to pass in the `Table` object, it should be non-null. Then we can consistently use `forTable` everywhere. I would also be open with the `forProperties` approach, as you were saying that schema and sort order are for data rows/files. I had some reservations because `MetricConfig` deprecated `forProperties` for data rows/files. It is a little weird to introduce it back for position delete. Flink can use `forPositionDelete(Table table)` API by passing a valid `Table` object to `FlinkAppenderFactory`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6313: Flink: use correct metric config for position deletes
stevenzwu commented on code in PR #6313: URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040425132 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java: ## @@ -160,7 +184,8 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); -MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); +MetricsConfig metricsConfig = +table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); Review Comment: this actually makes a stronger case that we should pass in a valid `Table` object to the `FlinkAppenderFactory` as I mentioned in the other comment. we should also use `forTable` for position delete although it doesn't need schema or SortOrder as you said. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6313: Flink: use correct metric config for position deletes
stevenzwu commented on code in PR #6313: URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040423889 ## core/src/main/java/org/apache/iceberg/MetricsConfig.java: ## @@ -107,6 +107,30 @@ public static MetricsConfig forPositionDelete(Table table) { return new MetricsConfig(columnModes.build(), defaultMode); } + /** + * Creates a metrics config for a position delete file. + * + * @param props table configuration + */ + public static MetricsConfig forPositionDelete(Map props) { Review Comment: @chenjunjiedada hmm. I think we should revert the last commit. if we are going to pass in the `Table` object, it should be non-null. Then we can consistently use `forTable` everywhere. I would also be open with the `forProperties` approach, as you were saying that schema and sort order are for data rows/files. I had some reservations because `MetricConfig` deprecated `forProperties` for data rows/files. It is a little weird to introduce it back for position delete. Flink can use `forPositionDelete(Table table)` API by passing a valid `SerializableTable` object to `FlinkAppenderFactory`. With `Table` object, we can remove the `Schema` and `PartitionSpec` from the current `FlinkAppenderFactory` constructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho opened a new pull request, #6365: Core: Add position deletes metadata table
szehon-ho opened a new pull request, #6365: URL: https://github.com/apache/iceberg/pull/6365 This breaks up the pr https://github.com/apache/iceberg/pull/4812 , and is just the part to add the table PositionDeletesTable. It is based on @aokolnychyi 's newly-added BatchScan interface, so the scan is free to not return FileScanTask. It returns a custom ScanTask thatl scan DeleteFiles rather than DataFiles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #6364: Optimise POS reads
RussellSpitzer commented on issue #6364: URL: https://github.com/apache/iceberg/issues/6364#issuecomment-1338783816 What is a POS file? I’m not familiar with the acronym Sent from my iPhoneOn Dec 5, 2022, at 9:00 PM, rbalamohan ***@***.***> wrote: Apache Iceberg version 0.14.1 Query engine Spark Please describe the bug 🐞 Currently combinedFileTask can have more than 1 file. Depending on the nature of workload, it can even have 30-50+ files in single split. When there are 4+ POS files, it takes lot longer time to process "select" queries. This is due to the fact, that every file needs to process POS file and it leads to read amplification. Request is to optimise the way POS file reading is done. Optimise parquet reader with cached filestatus and footer Optimise within combinedFileTask in a single task in a single executor. This can have more than 1 file in single split. Typically there can 10-50+ files depending on the size of the files. For simplicity, let us start with 1 POS file. This POS file can have delete information about all the 50+ files in the combined task Currently, for every file it opens, it needs "delete row positions". So it invokes "DeleteFilter::deletedRowPositions". This opens the POS file, reads the footer and reads the snippet for specific file path. Above step happens for all the 50+ files in sequential order. Internally, it opens and reads the footer information 50+ times which is not needed. Need a lightweight parquet reader, which can accept readerConfs etc and take up footer information as argument. Basically cache footer details, file status details to reduce turn around with object stores. Otherwise pass the POS reader during data reading, such that it doesn't need to reopen and read the footers again. Optimise on reading POS Though path is dictionary encoded, it ends up materializing the path again and again. Need a way to optimise this to reduce CPU burn when reading POS files. Covered in #5863 —Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you are subscribed to this thread.Message ID: ***@***.***> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040547304 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List recs, List rows) { Review Comment: The metadata files are stored in avro format, and when we read them directly, we get the Avro generic record. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040554968 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] nastra commented on pull request #6355: Build: Bump org.eclipse.jgit from 5.13.1.202206130422-r to 6.4.0.202211300538-r
nastra commented on PR #6355: URL: https://github.com/apache/iceberg/pull/6355#issuecomment-1338859690 we can't upgrade because the latest version that supports JDK8 is 5.13.1.202206130422-r -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra closed pull request #6355: Build: Bump org.eclipse.jgit from 5.13.1.202206130422-r to 6.4.0.202211300538-r
nastra closed pull request #6355: Build: Bump org.eclipse.jgit from 5.13.1.202206130422-r to 6.4.0.202211300538-r URL: https://github.com/apache/iceberg/pull/6355 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dependabot[bot] commented on pull request #6355: Build: Bump org.eclipse.jgit from 5.13.1.202206130422-r to 6.4.0.202211300538-r
dependabot[bot] commented on PR #6355: URL: https://github.com/apache/iceberg/pull/6355#issuecomment-1338859714 OK, I won't notify you again about this release, but will get in touch when a new version is available. If you'd rather skip all updates until the next major or minor version, let me know by commenting `@dependabot ignore this major version` or `@dependabot ignore this minor version`. You can also ignore all major, minor, or patch releases for a dependency by adding an [`ignore` condition](https://docs.github.com/en/code-security/supply-chain-security/configuration-options-for-dependency-updates#ignore) with the desired `update_types` to your config file. If you change your mind, just re-open this PR and I'll resolve any conflicts on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jaehyeon-kim commented on issue #4977: Support Kafka Connect within Iceberg
jaehyeon-kim commented on issue #4977: URL: https://github.com/apache/iceberg/issues/4977#issuecomment-1338869374 +1 for Kafka connect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040596892 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { +this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { +this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { +this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { +this.type = type; +this.struct = struct; +this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { +this.struct = newStruct; +return this; + } + + @Override + public int getArity() { +return struct.size(); + } + + @Override + public RowKind getRowKind() { +return kind; + } + + @Override + public void setRowKind(RowKind newKind) { +Preconditions.checkNotNull(newKind, "kind can not be null"); +this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { +return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { +return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { +return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { +return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { +Object integer = struct.get(pos, Object.class); + +if (integer instanceof Integer) { + return (int) integer; +} else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); +} + } + + @Override + public long getLong(int pos) { +Object longVal = struct.get(pos, Object.class); + +if (longVal instanceof Long) { + return (long) longVal; +} else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; +} else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); +} else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); +} + } + + @Override + public float getFloat(int pos) { +return struct.get(pos, Float.class); + } + + @Override +
[GitHub] [iceberg] zstraw commented on issue #4550: the snapshot file is lost when write iceberg using flink Failed to open input stream for file File does not exist
zstraw commented on issue #4550: URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1338908066 After deeping into iceberg code and the log, I can reproduce it in debugging locally. The scenario may happens in the process of Flink cancelling. 1. IcebergFileCommitter is going to commit file. In the step of **rename** metadata.json(org.apache.iceberg.hadoop.HadoopTableOperations#renameToFinal), org.apache.hadoop.ipc.Client.call encounters **InterruptedIOException**. I suspect it comes from Flink task cancelling. On the other hand, **Hdfs has renamed the metada.json file sucessfully**. 2. After rename fails, it's supposed to retry. But the thread encounters InterruptedException in sleeping(org.apache.iceberg.util.Tasks#runTaskWithRetry). Then it will throw a RuntimeException. And the version-hint will not be updated. 3. The RuntimeException leads to **rollback** in org.apache.iceberg.BaseTransaction(#cleanUpOnCommitFailure), which will delete manifest list (snap-XXX). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on pull request #6222: Flink: Support inspecting table
hililiwei commented on PR #6222: URL: https://github.com/apache/iceberg/pull/6222#issuecomment-1338913731 > @hililiwei we should add comprehensive unit test for `StructRowData`. > > I have some internal DataGenerators for unit test code with very comprehensive coverage all field types (including complex nested types). Maybe I will submit a separate PR for that, which will cover Flink `RowData` and Iceberg `GenericRecord`. You can expand it with support for Iceberg `StructLike`. With that, we can write unit test/assertions compare the actual and expected. That would be great. I'm also thinking of adding UT for it. So i'll do it based on your code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040606681 ## docs/flink-getting-started.md: ## @@ -712,9 +712,188 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | -## Inspecting tables. +## Inspecting tables -Iceberg does not support inspecting table in flink sql now, we need to use [iceberg's Java API](../api) to read iceberg's meta data to get those table information. +To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables. + +Metadata tables are identified by adding the metadata table name after the original table name. For example, history for `db.table` is read using `db.table$history`. + +### History + +To show table history: + +```sql +SELECT * FROM prod.db.table$history; +``` + +| made_current_at | snapshot_id | parent_id | is_current_ancestor | +| --- | --- | --- | --- | +| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL| true | +| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true | +| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false | +| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true | +| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true | +| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true | + +{{< hint info >}} +**This shows a commit that was rolled back.** The example has two snapshots with the same parent, and one is *not* an ancestor of the current table state. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org