[GitHub] [iceberg] InvisibleProgrammer commented on pull request #6337: Docs: Update Iceberg Hive documentation

2022-12-05 Thread GitBox


InvisibleProgrammer commented on PR #6337:
URL: https://github.com/apache/iceberg/pull/6337#issuecomment-1336973794

   @pvary , Can I ask for workflow approval? 
   
   Thank you,
   Zsolt


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat commented on pull request #6250: Docs: Remove redundant configuration from spark docs

2022-12-05 Thread GitBox


ajantha-bhat commented on PR #6250:
URL: https://github.com/apache/iceberg/pull/6250#issuecomment-1337013120

   cc: @pvary 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat commented on pull request #6234: Docs: Remove parent-version-id from the view spec example

2022-12-05 Thread GitBox


ajantha-bhat commented on PR #6234:
URL: https://github.com/apache/iceberg/pull/6234#issuecomment-1337015494

   cc: @stevenzwu, @jackye1995  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] KarlManong opened a new issue, #6359: Sometimes org.apache.iceberg.jdbc.TestJdbcTableConcurrency#testConcurrentFastAppends never end

2022-12-05 Thread GitBox


KarlManong opened a new issue, #6359:
URL: https://github.com/apache/iceberg/issues/6359

   ### Apache Iceberg version
   
   1.1.0 (latest release)
   
   ### Query engine
   
   Other
   
   ### Please describe the bug 🐞
   
   When I running `gradle build`, it stuck at `> :iceberg-core:test > Executing 
test org.apache.iceberg.jdbc.TestJdbcTableConcurrency`
   
   The stack shows 
   [stack.txt](https://github.com/apache/iceberg/files/10152988/stack.txt)
   
   Full log
   
[iceberg-core.log](https://github.com/apache/iceberg/files/10152998/iceberg-core.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] KarlManong closed issue #6359: Sometimes org.apache.iceberg.jdbc.TestJdbcTableConcurrency#testConcurrentFastAppends never end

2022-12-05 Thread GitBox


KarlManong closed issue #6359: Sometimes 
org.apache.iceberg.jdbc.TestJdbcTableConcurrency#testConcurrentFastAppends 
never end
URL: https://github.com/apache/iceberg/issues/6359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6091: Spark-3.3: Handle statistics file clean up from expireSnapshots action/procedure

2022-12-05 Thread GitBox


ajantha-bhat commented on code in PR #6091:
URL: https://github.com/apache/iceberg/pull/6091#discussion_r1039525785


##
core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java:
##
@@ -1234,6 +1245,40 @@ public void 
testMultipleRefsAndCleanExpiredFilesFailsForIncrementalCleanup() {
 .commit());
   }
 
+  @Test
+  public void testExpireWithStatisticsFiles() throws URISyntaxException, 
IOException {
+table.newAppend().appendFile(FILE_A).commit();
+File statsFileLocation1 = statsFileLocation(table);
+StatisticsFile statisticsFile1 = writeStatsFileForCurrentSnapshot(table, 
statsFileLocation1);
+Assert.assertEquals(
+"Must match the latest snapshot",
+table.currentSnapshot().snapshotId(),
+statisticsFile1.snapshotId());
+
+table.newAppend().appendFile(FILE_B).commit();
+File statsFileLocation2 = statsFileLocation(table);
+StatisticsFile statisticsFile2 = writeStatsFileForCurrentSnapshot(table, 
statsFileLocation2);
+Assert.assertEquals(
+"Must match the latest snapshot",
+table.currentSnapshot().snapshotId(),
+statisticsFile2.snapshotId());
+
+Assert.assertEquals("Should have 2 statistics file", 2, 
table.statisticsFiles().size());
+
+table.updateProperties().set(TableProperties.MAX_SNAPSHOT_AGE_MS, 
"1").commit();
+
+removeSnapshots(table).commit();
+
+Assert.assertEquals("Should keep 1 snapshot", 1, 
Iterables.size(table.snapshots()));
+Assertions.assertThat(table.statisticsFiles())
+.hasSize(1)
+.extracting(StatisticsFile::snapshotId)
+.as("Should contain only the statistics file of snapshot2")
+.isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId()));
+Assertions.assertThat(statsFileLocation1.exists()).isFalse();
+Assertions.assertThat(statsFileLocation2.exists()).isTrue();

Review Comment:
   Added now. This change was from a dependent PR (#6090). So, added the 
testcase in the dependent PR (#6090) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6090: Core: Handle statistics file clean up from expireSnapshots

2022-12-05 Thread GitBox


ajantha-bhat commented on code in PR #6090:
URL: https://github.com/apache/iceberg/pull/6090#discussion_r1039527780


##
core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java:
##
@@ -79,4 +80,15 @@ protected void deleteFiles(Set pathsToDelete, String 
fileType) {
 (file, thrown) -> LOG.warn("Delete failed for {} file: {}", 
fileType, file, thrown))
 .run(deleteFunc::accept);
   }
+
+  protected Set expiredStatisticsFilesLocations(
+  TableMetadata beforeExpiration, Set expiredIds) {
+Set expiredStatisticsFilesLocations = Sets.newHashSet();
+for (StatisticsFile statisticsFile : beforeExpiration.statisticsFiles()) {
+  if (expiredIds.contains(statisticsFile.snapshotId())) {
+expiredStatisticsFilesLocations.add(statisticsFile.path());
+  }
+}
+return expiredStatisticsFilesLocations;
+  }

Review Comment:
   @amogh-jahagirdar, @findepi  
   As #6267 clarifies the spec. I have handled the reuse scenario and added the 
testcase now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] XN137 commented on pull request #6221: Change SingleBufferInputStream .read signature to match super-method.

2022-12-05 Thread GitBox


XN137 commented on PR #6221:
URL: https://github.com/apache/iceberg/pull/6221#issuecomment-1337298037

   additional context:
   the failing check can only be performed when the compiled bytecode contains 
parameter name information for methods:
   
   
https://github.com/palantir/gradle-baseline/blob/fd7759a9a37112db69f6875966e869a078a00319/baseline-error-prone/src/main/java/com/palantir/baseline/errorprone/ConsistentOverrides.java#L151-L164
   
   so it depends on whether the jdk classes were compiled with the 
`-parameters` flag for `javac` or not:
   ```
 -parameters
   Generate metadata for reflection on method parameters
   ```
   
   on jdk 8 this flag is commonly not used:
   ```
   ~/.sdkman/candidates/java/8.0.352-zulu/bin$ ./javap -version && ./javap -l 
java.io.InputStream | egrep "public.*read|off"
   1.8.0_352
 public abstract int read() throws java.io.IOException;
 public int read(byte[]) throws java.io.IOException;
 public int read(byte[], int, int) throws java.io.IOException;
   ```
   
   on jdk 11 it is commonly used:
   ```
   ~/.sdkman/candidates/java/11.0.17-zulu/bin$ ./javap -version && ./javap -l 
java.io.InputStream | egrep "public.*read|off"
   11.0.17
 public abstract int read() throws java.io.IOException;
 public int read(byte[]) throws java.io.IOException;
 public int read(byte[], int, int) throws java.io.IOException;
 0  81 2   off   I
 public byte[] readAllBytes() throws java.io.IOException;
 public byte[] readNBytes(int) throws java.io.IOException;
   200  74 7 offset   I
 public int readNBytes(byte[], int, int) throws java.io.IOException;
 0  53 2   off   I
   ```
   
   but there are some jdk vendors (i.e. on centos7) that also enable this flag 
on jdk8, thus this check might fail depending on the jdk distribution used.
   so targeting jdk11 is just a way to always trigger the check violation.
   
   long story short, this PR should get merged to fix the check on all kind of 
jdks :sweat_drops: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039570176


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java:
##
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.collections.ListUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkMetaDataTable extends FlinkCatalogTestBase {

Review Comment:
   Missed it 😂 , added.  All the types have been tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039570407


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java:
##
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.collections.ListUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkMetaDataTable extends FlinkCatalogTestBase {
+
+  private static final String TABLE_NAME = "test_table";
+  private final FileFormat format = FileFormat.AVRO;
+  private static TemporaryFolder temp = new TemporaryFolder();
+
+  public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace) {
+super(catalogName, baseNamespace);
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+Configuration configuration = 
super.getTableEnv().getConfig().getConfiguration();
+configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
+configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
+return super.getTableEnv();
+  }
+
+  @Before
+  public void before() {
+super.before();
+sql("CREATE DATABASE %s", flinkDatabase);
+sql("USE CATALOG %s", catalogName);
+sql("USE %s", DATABASE);
+  }
+
+  @Override
+  @After
+  public void clean() {
+sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
+sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+super.clean();
+  }
+
+  @Test
+  public void testSnapshots() {
+sql(
+"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH 
('format-version'='2', 'write.format.default'='%s')",
+TABLE_NAME, format.name());
+sql(
+"INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS 
VARCHAR),30)",
+TABLE_NAME);
+String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME);
+List result = sql(sql);
+
+Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+Iterator snapshots = table.snapshots().iterator();
+for (Row row : result) {
+  Snapshot next = snapshots.next();
+  Assert.assertEquals(
+  "Should have expected timestamp",
+  ((Instant) row.getField(0)).toEpochMilli(),
+  next.timestampMillis());
+  Assert.assertEquals("Should have expected snapshot id", row.getField(1), 
next.snapshotId());
+  Assert.assertEquals("Should have expected paren

[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039570783


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java:
##
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.collections.ListUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkMetaDataTable extends FlinkCatalogTestBase {
+
+  private static final String TABLE_NAME = "test_table";
+  private final FileFormat format = FileFormat.AVRO;
+  private static TemporaryFolder temp = new TemporaryFolder();
+
+  public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace) {
+super(catalogName, baseNamespace);
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+Configuration configuration = 
super.getTableEnv().getConfig().getConfiguration();
+configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
+configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
+return super.getTableEnv();
+  }
+
+  @Before
+  public void before() {
+super.before();
+sql("CREATE DATABASE %s", flinkDatabase);
+sql("USE CATALOG %s", catalogName);
+sql("USE %s", DATABASE);
+  }
+
+  @Override
+  @After
+  public void clean() {
+sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
+sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+super.clean();
+  }
+
+  @Test
+  public void testSnapshots() {
+sql(
+"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH 
('format-version'='2', 'write.format.default'='%s')",
+TABLE_NAME, format.name());
+sql(
+"INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS 
VARCHAR),30)",
+TABLE_NAME);
+String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME);
+List result = sql(sql);
+
+Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+Iterator snapshots = table.snapshots().iterator();
+for (Row row : result) {
+  Snapshot next = snapshots.next();
+  Assert.assertEquals(
+  "Should have expected timestamp",
+  ((Instant) row.getField(0)).toEpochMilli(),
+  next.timestampMillis());
+  Assert.assertEquals("Should have expected snapshot id", row.getField(1), 
next.snapshotId());
+  Assert.assertEquals("Should have expected paren

[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039571147


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java:
##
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.collections.ListUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkMetaDataTable extends FlinkCatalogTestBase {
+
+  private static final String TABLE_NAME = "test_table";
+  private final FileFormat format = FileFormat.AVRO;
+  private static TemporaryFolder temp = new TemporaryFolder();
+
+  public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace) {
+super(catalogName, baseNamespace);
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+Configuration configuration = 
super.getTableEnv().getConfig().getConfiguration();
+configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
+configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, 
true);
+return super.getTableEnv();
+  }
+
+  @Before
+  public void before() {
+super.before();
+sql("CREATE DATABASE %s", flinkDatabase);
+sql("USE CATALOG %s", catalogName);
+sql("USE %s", DATABASE);
+  }
+
+  @Override
+  @After
+  public void clean() {
+sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
+sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+super.clean();
+  }
+
+  @Test
+  public void testSnapshots() {
+sql(
+"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH 
('format-version'='2', 'write.format.default'='%s')",
+TABLE_NAME, format.name());
+sql(
+"INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS 
VARCHAR),30)",
+TABLE_NAME);
+String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME);
+List result = sql(sql);
+
+Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+Iterator snapshots = table.snapshots().iterator();
+for (Row row : result) {
+  Snapshot next = snapshots.next();
+  Assert.assertEquals(
+  "Should have expected timestamp",
+  ((Instant) row.getField(0)).toEpochMilli(),
+  next.timestampMillis());
+  Assert.assertEquals("Should have expected snapshot id", row.getField(1), 
next.snapshotId());

Review Comment:
   😂 It has been corrected.



-- 
Th

[GitHub] [iceberg] Fokko merged pull request #6221: Change SingleBufferInputStream .read signature to match super-method.

2022-12-05 Thread GitBox


Fokko merged PR #6221:
URL: https://github.com/apache/iceberg/pull/6221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039674684


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039683554


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #6335: Core: Avoid generating a large ManifestFile when committing

2022-12-05 Thread GitBox


ConeyLiu commented on code in PR #6335:
URL: https://github.com/apache/iceberg/pull/6335#discussion_r1039686191


##
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##
@@ -499,6 +501,40 @@ protected long snapshotId() {
 return snapshotId;
   }
 
+  protected static > List 
writeFilesToManifests(
+  Iterable files,
+  Supplier> createWriter,
+  Long newFilesSequenceNumber,
+  long targetSizeBytes)
+  throws IOException {
+List result = Lists.newArrayList();
+Iterator fileIterator = files.iterator();
+ManifestWriter writer = null;

Review Comment:
   Updated with try-finally, while try-with-resources seems not easy to 
implement.
   
   > Could we initialize the writer upfront?
   
   There are some UTs failed if do that. I tried the following:
   ```java
   try {
 writer = createWriter.get();
 while (fileIterator.hasNext()) {
   if (writer.length() >= targetSizeBytes) { 
 // here could produce an empty file because the intialize size of 
the newly writer could be larger than targetSizeBytes
 writer.close();
 result.add(writer.toManifestFile());
 writer = createWriter.get();
   }
   
   F file = fileIterator.next();
   if (newFilesSequenceNumber == null) {
 writer.add(file);
   } else {
 writer.add(file, newFilesSequenceNumber);
   }
 }
   } finally {
 if (writer != null) {
   writer.close();
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat commented on pull request #6221: Change SingleBufferInputStream .read signature to match super-method.

2022-12-05 Thread GitBox


ajantha-bhat commented on PR #6221:
URL: https://github.com/apache/iceberg/pull/6221#issuecomment-1337501228

   > but there are some jdk vendors (i.e. on centos7) that also enable this 
flag on jdk8, thus this check might fail depending on the jdk distribution used.
   
   @XN137: Thank you so much for digging down on this and figuring out how jdk8 
is also affected by this. 
   
   thanks @Fokko for merging the PR and @dchristle for the fix. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #6335: Core: Avoid generating a large ManifestFile when committing

2022-12-05 Thread GitBox


ConeyLiu commented on code in PR #6335:
URL: https://github.com/apache/iceberg/pull/6335#discussion_r1039689716


##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -838,9 +839,17 @@ public Object updateEvent() {
   }
 
   private void cleanUncommittedAppends(Set committed) {
-if (cachedNewManifest != null && !committed.contains(cachedNewManifest)) {
-  deleteFile(cachedNewManifest.path());
-  this.cachedNewManifest = null;
+if (cachedNewManifests != null) {
+  List cleanedNewManifests = Lists.newArrayList();
+  for (ManifestFile manifestFile : cachedNewManifests) {
+if (!committed.contains(manifestFile)) {
+  deleteFile(manifestFile.path());
+} else {
+  cleanedNewManifests.add(manifestFile);
+}
+  }
+
+  this.cachedNewManifests = cleanedNewManifests;

Review Comment:
   From the failed UTs, it seems it could call the `cleanUncommittedAppends` 
multiple times when retrying the commit. And we need to make sure the previous 
wroten manifest file is deleted.



##
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##
@@ -948,13 +949,10 @@ private List newDeleteFilesAsManifests() {
   (specId, deleteFiles) -> {
 PartitionSpec spec = ops.current().spec(specId);
 try {
-  ManifestWriter writer = 
newDeleteManifestWriter(spec);
-  try {
-writer.addAll(deleteFiles);
-  } finally {
-writer.close();
-  }
-  cachedNewDeleteManifests.add(writer.toManifestFile());
+  List manifestFiles =

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039702122


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs

2022-12-05 Thread GitBox


RussellSpitzer commented on code in PR #6250:
URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039705908


##
docs/spark-getting-started.md:
##
@@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for 
tables under `$PWD/w
 ```sh
 spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% 
icebergVersion %}}\
 --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
---conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \

Review Comment:
   This makes line 55 Incorrect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on pull request #6335: Core: Avoid generating a large ManifestFile when committing

2022-12-05 Thread GitBox


nastra commented on PR #6335:
URL: https://github.com/apache/iceberg/pull/6335#issuecomment-1337557734

   It is mentioned in the docs that `MANIFEST_TARGET_SIZE_BYTES` relates to 
`Target size when merging manifest files`, meaning that this setting only takes 
effect when merging of manifest files happens (e.g. when using `newAppend()`). 
Merging of manifest files would not happen when using `newFastAppend()` for 
example. This might explain why it seemed that this setting wouldn't take any 
effect in your env. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs

2022-12-05 Thread GitBox


ajantha-bhat commented on code in PR #6250:
URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039751700


##
docs/spark-getting-started.md:
##
@@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for 
tables under `$PWD/w
 ```sh
 spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% 
icebergVersion %}}\
 --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
---conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \

Review Comment:
   Do you mean `adds support for Iceberg tables to Spark's built-in catalog` 
was achieved by adding a `SparkSessionCatalog`?
   
   But then it is of type hive and URI is not mentioned. Which may confuse the 
users.
   Also `spark_catalog` was not used in any example snippets. 
   
   So, do you have any suggestions here? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on a diff in pull request #6353: Make sure S3 stream opened by ReadConf ctor is closed

2022-12-05 Thread GitBox


nastra commented on code in PR #6353:
URL: https://github.com/apache/iceberg/pull/6353#discussion_r1039765677


##
parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java:
##
@@ -46,7 +47,7 @@
  *
  * @param  type of value to read
  */
-class ReadConf {
+class ReadConf implements Closeable {

Review Comment:
   what about making this implement `AutoCloseable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on a diff in pull request #6348: Python: Update license-checker

2022-12-05 Thread GitBox


nastra commented on code in PR #6348:
URL: https://github.com/apache/iceberg/pull/6348#discussion_r1039781132


##
python/dev/.rat-excludes:
##
@@ -0,0 +1,2 @@
+.rat-excludes

Review Comment:
   related to [this old 
comment](https://github.com/apache/iceberg/pull/5840#discussion_r978666139) I'm 
curious whether it would now make sense to re-use the `check-license` script 
from the root dev folder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039789754


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs

2022-12-05 Thread GitBox


ajantha-bhat commented on code in PR #6250:
URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039831769


##
docs/spark-getting-started.md:
##
@@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for 
tables under `$PWD/w
 ```sh
 spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% 
icebergVersion %}}\
 --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
---conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \

Review Comment:
   which one can be better?
   a) Remove "built-in catalog" mentioned from the line 55 
   b) or change  `spark_catalog` type also to `Hadoop`
   c) or add some hive URI = xx in the example snippet 
   
   I don't think we can leave it as it is because the catalog-type is hive but 
without URI will just confuse the users. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs

2022-12-05 Thread GitBox


RussellSpitzer commented on code in PR #6250:
URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039836905


##
docs/spark-getting-started.md:
##
@@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for 
tables under `$PWD/w
 ```sh
 spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% 
icebergVersion %}}\
 --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
---conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \

Review Comment:
   "spark_catalog" is the built in catalog for Spark. The one you get without 
specifying anything else. This configuration would work for any user who has 
already defined a HMS for Spark and would like it to also work with Iceberg 
Tables.
   
   I think it's fine to skip this usage though since all the examples use a 
separate "SparkCatalog"
   
   In this case I think we should probably add some more details 
SparkSessionCatalog in some different section perhaps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs

2022-12-05 Thread GitBox


RussellSpitzer commented on code in PR #6250:
URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039836905


##
docs/spark-getting-started.md:
##
@@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for 
tables under `$PWD/w
 ```sh
 spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% 
icebergVersion %}}\
 --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
---conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \

Review Comment:
   "spark_catalog" is the built in catalog for Spark. The one you get without 
specifying anything else. This configuration would work for any user who has 
already defined a HMS for Spark and would like it to also work with Iceberg 
Tables. So I would think this would be useful to most folks who are using 
Iceberg with an existing Spark Setup.
   
   I think it's fine to skip "spark_catalog" in this particular usage though 
since all the examples use a separate "local" SparkCatalog. If we remove it we 
should probably add some more details SparkSessionCatalog in some different 
section perhaps? Or maybe add another example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039841272


##
docs/flink-getting-started.md:
##
@@ -712,9 +712,188 @@ INSERT INTO tableName /*+ 
OPTIONS('upsert-enabled'='true') */
 | compression-strategy   | Table write.orc.compression-strategy   | 
Overrides this table's compression strategy for ORC tables for this write   
   |
 
 
-## Inspecting tables.
+## Inspecting tables
 
-Iceberg does not support inspecting table in flink sql now, we need to use 
[iceberg's Java API](../api) to read iceberg's meta data to get those table 
information.
+To inspect a table's history, snapshots, and other metadata, Iceberg supports 
metadata tables.
+
+Metadata tables are identified by adding the metadata table name after the 
original table name. For example, history for `db.table` is read using 
`db.table$history`.
+
+### History
+
+To show table history:
+
+```sql
+SELECT * FROM prod.db.table$history;
+```
+
+| made_current_at | snapshot_id | parent_id   | 
is_current_ancestor |
+| --- | --- | --- | 
--- |
+| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL| true   
 |
+| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true   
 |
+| 2019-02-09 16:24:30.13  | 296410040247533544  | 5179299526185056830 | false  
 |
+| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true   
 |
+| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true   
 |
+| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true   
 |
+
+{{< hint info >}}
+**This shows a commit that was rolled back.** The example has two snapshots 
with the same parent, and one is *not* an ancestor of the current table state.

Review Comment:
   maybe more explicit?
   
   ```
   In this example, snapshot 296410040247533544 and 2999875608062437330 have 
the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was 
rolled back and is *not* an ancestor of the current table state.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6250: Docs: Remove redundant configuration from spark docs

2022-12-05 Thread GitBox


ajantha-bhat commented on code in PR #6250:
URL: https://github.com/apache/iceberg/pull/6250#discussion_r1039848442


##
docs/spark-getting-started.md:
##
@@ -57,8 +57,6 @@ This command creates a path-based catalog named `local` for 
tables under `$PWD/w
 ```sh
 spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:{{% 
icebergVersion %}}\
 --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
---conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \

Review Comment:
   
https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration
 already talks about the `SparkSessionCatalog`.  So, we can remove it from here?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

2022-12-05 Thread GitBox


pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1039860822


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps 
Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too 
many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int 
pendingSplitCountFromAssigner) {
+if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+  // only check throttling when full history is obtained.

Review Comment:
   Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ajantha-bhat opened a new pull request, #6360: Docs: Update Zorder spark support versions.

2022-12-05 Thread GitBox


ajantha-bhat opened a new pull request, #6360:
URL: https://github.com/apache/iceberg/pull/6360

   Some users are using Zorder with spark-3.1 and facing a confusing error 
message. Hence, updating the document.
   
   Also thought about updating the code to throw the unsupported exception.  
But maybe not required to modify the code as it is only the older versions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039891570


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class AesGcmInputFile implements InputFile {

Review Comment:
   You're right. This would be the result of the `decrypt` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039893751


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class AesGcmInputFile implements InputFile {
+  private final InputFile sourceFile;
+  private final byte[] dataKey;
+  private long plaintextLength;
+
+  public AesGcmInputFile(InputFile sourceFile, byte[] dataKey) {
+this.sourceFile = sourceFile;
+this.dataKey = dataKey;
+this.plaintextLength = -1;
+  }
+
+  @Override
+  public long getLength() {
+if (plaintextLength == -1) {
+  try {
+this.newStream().close();

Review Comment:
   We should be able to know the length without opening the stream, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue merged pull request #4925: API: Add view interfaces

2022-12-05 Thread GitBox


rdblue merged PR #4925:
URL: https://github.com/apache/iceberg/pull/4925


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on pull request #4925: API: Add view interfaces

2022-12-05 Thread GitBox


rdblue commented on PR #4925:
URL: https://github.com/apache/iceberg/pull/4925#issuecomment-1337860939

   Merge! Let me know where the implementation PR is and I'll start looking at 
that!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039912960


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AesGcmInputStream extends SeekableInputStream {
+  private final SeekableInputStream sourceStream;
+  private final boolean emptyCipherStream;
+  private final long netSourceFileSize;
+  private final Ciphers.AesGcmDecryptor gcmDecryptor;
+  private final byte[] ciphertextBlockBuffer;
+  private final int cipherBlockSize;
+  private final int plainBlockSize;
+  private final int numberOfBlocks;
+  private final int lastCipherBlockSize;
+  private final long plainStreamSize;
+  private final byte[] fileAadPrefix;
+
+  private long plainStreamPosition;
+  private int currentBlockIndex;
+  private int currentOffsetInPlainBlock;
+
+  AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength,
+byte[] aesKey, byte[] fileAadPrefix) throws IOException {
+this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH;
+Preconditions.checkArgument(netSourceFileSize >= 0,
+"Source length " + sourceLength + " is shorter than GCM prefix. File 
is not encrypted");
+
+this.emptyCipherStream = (0 == netSourceFileSize);
+this.sourceStream = sourceStream;
+byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH];

Review Comment:
   Can we call this a header? We use `prefix` in several places and I think it 
would be more clear to call it a header since it is only at the start of the 
file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6360: Docs: Update Zorder spark support versions.

2022-12-05 Thread GitBox


RussellSpitzer commented on code in PR #6360:
URL: https://github.com/apache/iceberg/pull/6360#discussion_r1039913771


##
docs/spark-procedures.md:
##
@@ -271,7 +271,7 @@ Iceberg can compact data files in parallel using Spark with 
the `rewriteDataFile
 |---|---|--|-|
 | `table`   | ✔️  | string | Name of the table to update |
 | `strategy`|| string | Name of the strategy - binpack or sort. 
Defaults to binpack strategy |
-| `sort_order`  || string | If Zorder, then comma separated column names 
within zorder() text. Example: zorder(c1,c2,c3). Else, Comma separated 
sort_order_column. Where sort_order_column is a space separated sort order info 
per column (ColumnName SortDirection NullOrder).  SortDirection can be ASC 
or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
+| `sort_order`  || string | If Zorder(supported from Spark-3.2 and above), 
then comma separated column names within zorder() text. Example: 
zorder(c1,c2,c3). Else, Comma separated sort_order_column. Where 
sort_order_column is a space separated sort order info per column (ColumnName 
SortDirection NullOrder).  SortDirection can be ASC or DESC. NullOrder can 
be NULLS FIRST or NULLS LAST |

Review Comment:
   I think either (Supported in Spark 3.2 and above) or (Supported from Spark 
3.2) but the mix of "from" and "and above" is a little confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho closed issue #4362: Expose human-readable metrics in metadata tables

2022-12-05 Thread GitBox


szehon-ho closed issue #4362: Expose human-readable metrics in metadata tables
URL: https://github.com/apache/iceberg/issues/4362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho merged pull request #5376: Core: Add readable metrics columns to files metadata tables

2022-12-05 Thread GitBox


szehon-ho merged PR #5376:
URL: https://github.com/apache/iceberg/pull/5376


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho commented on pull request #5376: Core: Add readable metrics columns to files metadata tables

2022-12-05 Thread GitBox


szehon-ho commented on PR #5376:
URL: https://github.com/apache/iceberg/pull/5376#issuecomment-1337877894

   Thanks @RussellSpitzer @aokolnychyi @chenjunjiedada for detailed reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039937969


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AesGcmInputStream extends SeekableInputStream {
+  private final SeekableInputStream sourceStream;
+  private final boolean emptyCipherStream;
+  private final long netSourceFileSize;
+  private final Ciphers.AesGcmDecryptor gcmDecryptor;
+  private final byte[] ciphertextBlockBuffer;
+  private final int cipherBlockSize;
+  private final int plainBlockSize;
+  private final int numberOfBlocks;
+  private final int lastCipherBlockSize;
+  private final long plainStreamSize;
+  private final byte[] fileAadPrefix;
+
+  private long plainStreamPosition;
+  private int currentBlockIndex;
+  private int currentOffsetInPlainBlock;
+
+  AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength,
+byte[] aesKey, byte[] fileAadPrefix) throws IOException {
+this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH;
+Preconditions.checkArgument(netSourceFileSize >= 0,
+"Source length " + sourceLength + " is shorter than GCM prefix. File 
is not encrypted");
+
+this.emptyCipherStream = (0 == netSourceFileSize);
+this.sourceStream = sourceStream;
+byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH];
+int fetched = sourceStream.read(prefixBytes);
+Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH,
+"Insufficient read " + fetched +
+". The stream length should be at least " + 
Ciphers.GCM_STREAM_PREFIX_LENGTH);
+
+byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length];
+System.arraycopy(prefixBytes, 0, magic, 0, 
Ciphers.GCM_STREAM_MAGIC_ARRAY.length);
+Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, 
magic),
+"Cannot open encrypted file, it does not begin with magic string " + 
Ciphers.GCM_STREAM_MAGIC_STRING);
+
+if (!emptyCipherStream) {
+  this.plainStreamPosition = 0;
+  this.fileAadPrefix = fileAadPrefix;
+  gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey);
+  plainBlockSize = ByteBuffer.wrap(prefixBytes, 
Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4)
+  .order(ByteOrder.LITTLE_ENDIAN).getInt();
+  Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + 
plainBlockSize);
+
+  cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + 
Ciphers.GCM_TAG_LENGTH;
+  this.ciphertextBlockBuffer = new byte[cipherBlockSize];
+  this.currentBlockIndex = 0;
+  this.currentOffsetInPlainBlock = 0;
+
+  int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / 
cipherBlockSize);
+  int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - 
numberOfFullBlocks * cipherBlockSize);
+  boolean fullBlocksOnly = (0 == cipherBytesInLastBlock);
+  numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : 
numberOfFullBlocks + 1;
+  lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : 
cipherBytesInLastBlock; // never 0
+  int plainBytesInLastBlock = fullBlocksOnly ? 0 :
+  (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - 
Ciphers.GCM_TAG_LENGTH);
+  plainStreamSize = numberOfFullBlocks * plainBlockSize + 
plainBytesInLastBlock;
+} else {
+  plainStreamSize = 0;
+
+  gcmDecryptor = null;
+  ciphertextBlockBuffer = null;
+  cipherBlockSize = -1;
+  plainBlockSize = -1;
+  numberOfBlocks = -1;
+  lastCipherBlockSize = -1;
+  this.fileAadPrefix = null;
+}
+  }
+
+  public long plaintextStreamSize() {
+return plainStreamSize;
+  }
+
+  @Override
+  public int available() throws IOException {
+long maxAvailable = plainStreamSize - plainStreamPositio

[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039939654


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AesGcmInputStream extends SeekableInputStream {
+  private final SeekableInputStream sourceStream;
+  private final boolean emptyCipherStream;
+  private final long netSourceFileSize;
+  private final Ciphers.AesGcmDecryptor gcmDecryptor;
+  private final byte[] ciphertextBlockBuffer;
+  private final int cipherBlockSize;
+  private final int plainBlockSize;
+  private final int numberOfBlocks;
+  private final int lastCipherBlockSize;
+  private final long plainStreamSize;
+  private final byte[] fileAadPrefix;
+
+  private long plainStreamPosition;
+  private int currentBlockIndex;
+  private int currentOffsetInPlainBlock;
+
+  AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength,
+byte[] aesKey, byte[] fileAadPrefix) throws IOException {
+this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH;
+Preconditions.checkArgument(netSourceFileSize >= 0,
+"Source length " + sourceLength + " is shorter than GCM prefix. File 
is not encrypted");
+
+this.emptyCipherStream = (0 == netSourceFileSize);
+this.sourceStream = sourceStream;
+byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH];
+int fetched = sourceStream.read(prefixBytes);
+Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH,
+"Insufficient read " + fetched +
+". The stream length should be at least " + 
Ciphers.GCM_STREAM_PREFIX_LENGTH);
+
+byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length];
+System.arraycopy(prefixBytes, 0, magic, 0, 
Ciphers.GCM_STREAM_MAGIC_ARRAY.length);
+Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, 
magic),
+"Cannot open encrypted file, it does not begin with magic string " + 
Ciphers.GCM_STREAM_MAGIC_STRING);
+
+if (!emptyCipherStream) {
+  this.plainStreamPosition = 0;
+  this.fileAadPrefix = fileAadPrefix;
+  gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey);
+  plainBlockSize = ByteBuffer.wrap(prefixBytes, 
Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4)
+  .order(ByteOrder.LITTLE_ENDIAN).getInt();
+  Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + 
plainBlockSize);
+
+  cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + 
Ciphers.GCM_TAG_LENGTH;
+  this.ciphertextBlockBuffer = new byte[cipherBlockSize];
+  this.currentBlockIndex = 0;
+  this.currentOffsetInPlainBlock = 0;
+
+  int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / 
cipherBlockSize);
+  int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - 
numberOfFullBlocks * cipherBlockSize);
+  boolean fullBlocksOnly = (0 == cipherBytesInLastBlock);
+  numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : 
numberOfFullBlocks + 1;
+  lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : 
cipherBytesInLastBlock; // never 0
+  int plainBytesInLastBlock = fullBlocksOnly ? 0 :
+  (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - 
Ciphers.GCM_TAG_LENGTH);
+  plainStreamSize = numberOfFullBlocks * plainBlockSize + 
plainBytesInLastBlock;
+} else {
+  plainStreamSize = 0;
+
+  gcmDecryptor = null;
+  ciphertextBlockBuffer = null;
+  cipherBlockSize = -1;
+  plainBlockSize = -1;
+  numberOfBlocks = -1;
+  lastCipherBlockSize = -1;
+  this.fileAadPrefix = null;
+}
+  }
+
+  public long plaintextStreamSize() {
+return plainStreamSize;
+  }
+
+  @Override
+  public int available() throws IOException {
+long maxAvailable = plainStreamSize - plainStreamPositio

[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039941138


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AesGcmInputStream extends SeekableInputStream {
+  private final SeekableInputStream sourceStream;
+  private final boolean emptyCipherStream;
+  private final long netSourceFileSize;
+  private final Ciphers.AesGcmDecryptor gcmDecryptor;
+  private final byte[] ciphertextBlockBuffer;
+  private final int cipherBlockSize;
+  private final int plainBlockSize;
+  private final int numberOfBlocks;
+  private final int lastCipherBlockSize;
+  private final long plainStreamSize;
+  private final byte[] fileAadPrefix;
+
+  private long plainStreamPosition;
+  private int currentBlockIndex;
+  private int currentOffsetInPlainBlock;
+
+  AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength,
+byte[] aesKey, byte[] fileAadPrefix) throws IOException {
+this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH;
+Preconditions.checkArgument(netSourceFileSize >= 0,
+"Source length " + sourceLength + " is shorter than GCM prefix. File 
is not encrypted");
+
+this.emptyCipherStream = (0 == netSourceFileSize);
+this.sourceStream = sourceStream;
+byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH];
+int fetched = sourceStream.read(prefixBytes);
+Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH,
+"Insufficient read " + fetched +
+". The stream length should be at least " + 
Ciphers.GCM_STREAM_PREFIX_LENGTH);
+
+byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length];
+System.arraycopy(prefixBytes, 0, magic, 0, 
Ciphers.GCM_STREAM_MAGIC_ARRAY.length);
+Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, 
magic),
+"Cannot open encrypted file, it does not begin with magic string " + 
Ciphers.GCM_STREAM_MAGIC_STRING);
+
+if (!emptyCipherStream) {
+  this.plainStreamPosition = 0;
+  this.fileAadPrefix = fileAadPrefix;
+  gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey);

Review Comment:
   Pass AAD?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039944476


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AesGcmInputStream extends SeekableInputStream {
+  private final SeekableInputStream sourceStream;
+  private final boolean emptyCipherStream;
+  private final long netSourceFileSize;
+  private final Ciphers.AesGcmDecryptor gcmDecryptor;
+  private final byte[] ciphertextBlockBuffer;
+  private final int cipherBlockSize;
+  private final int plainBlockSize;
+  private final int numberOfBlocks;
+  private final int lastCipherBlockSize;
+  private final long plainStreamSize;
+  private final byte[] fileAadPrefix;
+
+  private long plainStreamPosition;
+  private int currentBlockIndex;
+  private int currentOffsetInPlainBlock;
+
+  AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength,
+byte[] aesKey, byte[] fileAadPrefix) throws IOException {
+this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH;
+Preconditions.checkArgument(netSourceFileSize >= 0,
+"Source length " + sourceLength + " is shorter than GCM prefix. File 
is not encrypted");
+
+this.emptyCipherStream = (0 == netSourceFileSize);
+this.sourceStream = sourceStream;
+byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH];
+int fetched = sourceStream.read(prefixBytes);
+Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH,
+"Insufficient read " + fetched +
+". The stream length should be at least " + 
Ciphers.GCM_STREAM_PREFIX_LENGTH);
+
+byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length];
+System.arraycopy(prefixBytes, 0, magic, 0, 
Ciphers.GCM_STREAM_MAGIC_ARRAY.length);
+Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, 
magic),
+"Cannot open encrypted file, it does not begin with magic string " + 
Ciphers.GCM_STREAM_MAGIC_STRING);
+
+if (!emptyCipherStream) {
+  this.plainStreamPosition = 0;
+  this.fileAadPrefix = fileAadPrefix;
+  gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey);
+  plainBlockSize = ByteBuffer.wrap(prefixBytes, 
Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4)
+  .order(ByteOrder.LITTLE_ENDIAN).getInt();
+  Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + 
plainBlockSize);
+
+  cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + 
Ciphers.GCM_TAG_LENGTH;
+  this.ciphertextBlockBuffer = new byte[cipherBlockSize];
+  this.currentBlockIndex = 0;
+  this.currentOffsetInPlainBlock = 0;
+
+  int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / 
cipherBlockSize);
+  int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - 
numberOfFullBlocks * cipherBlockSize);
+  boolean fullBlocksOnly = (0 == cipherBytesInLastBlock);
+  numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : 
numberOfFullBlocks + 1;
+  lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : 
cipherBytesInLastBlock; // never 0
+  int plainBytesInLastBlock = fullBlocksOnly ? 0 :
+  (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - 
Ciphers.GCM_TAG_LENGTH);

Review Comment:
   I thought this would be the block size, but it looks like the encrypted 
block size includes the nonce and tag. I think that's a bit confusing and we 
should make sure the behavior is specified in the spec.
   
   Since we're planning on making the block size constant, that works well. We 
can have a constant for this so it's easy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-uns

[GitHub] [iceberg] rdblue commented on a diff in pull request #3231: GCM encryption stream

2022-12-05 Thread GitBox


rdblue commented on code in PR #3231:
URL: https://github.com/apache/iceberg/pull/3231#discussion_r1039945676


##
core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AesGcmInputStream extends SeekableInputStream {
+  private final SeekableInputStream sourceStream;
+  private final boolean emptyCipherStream;
+  private final long netSourceFileSize;
+  private final Ciphers.AesGcmDecryptor gcmDecryptor;
+  private final byte[] ciphertextBlockBuffer;
+  private final int cipherBlockSize;
+  private final int plainBlockSize;
+  private final int numberOfBlocks;
+  private final int lastCipherBlockSize;
+  private final long plainStreamSize;
+  private final byte[] fileAadPrefix;
+
+  private long plainStreamPosition;
+  private int currentBlockIndex;
+  private int currentOffsetInPlainBlock;
+
+  AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength,
+byte[] aesKey, byte[] fileAadPrefix) throws IOException {
+this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH;
+Preconditions.checkArgument(netSourceFileSize >= 0,
+"Source length " + sourceLength + " is shorter than GCM prefix. File 
is not encrypted");

Review Comment:
   If the unencrypted file length is 0, we should return an in-memory input 
stream with 0 bytes instead of a decrypting one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] InvisibleProgrammer closed pull request #6337: Docs: Update Iceberg Hive documentation

2022-12-05 Thread GitBox


InvisibleProgrammer closed pull request #6337: Docs: Update Iceberg Hive 
documentation
URL: https://github.com/apache/iceberg/pull/6337


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] InvisibleProgrammer commented on pull request #6337: Docs: Update Iceberg Hive documentation

2022-12-05 Thread GitBox


InvisibleProgrammer commented on PR #6337:
URL: https://github.com/apache/iceberg/pull/6337#issuecomment-1337926448

   Hi, @pvary !
   
   First of all, thank you for the review and approval. 
   Unfortunately, I just learned the `Close` button on an approved PR doesn't 
mean close and merge, it just closes. So I reopened it. 
   I assume, as a first-time committer, I have no permission to merge. 
   
   Could you please merge it? 
   
   Thank you,
   Zsolt


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] InvisibleProgrammer commented on pull request #6337: Docs: Update Iceberg Hive documentation

2022-12-05 Thread GitBox


InvisibleProgrammer commented on PR #6337:
URL: https://github.com/apache/iceberg/pull/6337#issuecomment-1337938679

   Hi, @pvary !
   
   First of all, thank you for the review and approval. 
   Unfortunately, I just learned the `Close` button on an approved PR doesn't 
mean close and merge, it just closes. So I reopened it. 
   I assume, as a first-time committer, I have no permission to merge. 
   
   Could you please merge it? 
   
   Thank you,
   Zsolt


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] tprelle commented on pull request #6327: ORC: Fix error when projecting nested indentity partition column

2022-12-05 Thread GitBox


tprelle commented on PR #6327:
URL: https://github.com/apache/iceberg/pull/6327#issuecomment-1338004394

   hi @shardulm94, it's seems less intrusive and better than 
https://github.com/apache/iceberg/pull/4599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new issue, #6361: Python: Ignore home folder when running tests

2022-12-05 Thread GitBox


Fokko opened a new issue, #6361:
URL: https://github.com/apache/iceberg/issues/6361

   ### Feature Request / Improvement
   
   When you run tests on your local machine, and you have a `~/.pyiceberg.yaml` 
around, there is a possibility that the `test_missing_uri` will fail because it 
will pick up the URI from the configuration.
   
   Instead, for the tests, we could create a fixture that will create a 
temporary directory, and point the home directory to that freshly created 
folder. This way the tests are more isolated from the environment it runs in.
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new pull request, #6362: Python: Fix PyArrow import

2022-12-05 Thread GitBox


Fokko opened a new pull request, #6362:
URL: https://github.com/apache/iceberg/pull/6362

   Tested this in a fresh docker container:
   
   ```
   ➜  python git:(fd-fix-pyarrow-import) docker run -v `pwd`:/vo/ -t -i 
python:3.9 bash
   
   root@1252c09f932c:/vo# cd /vo/
   root@1252c09f932c:/vo# pip3 install -e ".[s3fs]"
   Obtaining file:///vo
 Installing build dependencies ... done
 Checking if build backend supports build_editable ... done
 Getting requirements to build editable ... done
 Preparing editable metadata (pyproject.toml) ... done
   Collecting mmhash3==3.0.1
 Downloading 
mmhash3-3.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (38 kB)
   Collecting pyyaml==6.0.0
 Downloading 
PyYAML-6.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (731 kB)
 731.1/731.1 KB 11.0 MB/s eta 
0:00:00
   Collecting rich==12.6.0
 Downloading rich-12.6.0-py3-none-any.whl (237 kB)
 237.5/237.5 KB 9.7 MB/s eta 
0:00:00
   Requirement already satisfied: requests==2.28.1 in 
/usr/local/lib/python3.9/site-packages (from pyiceberg==0.2.0) (2.28.1)
   Collecting pydantic==1.10.2
 Downloading pydantic-1.10.2-py3-none-any.whl (154 kB)
 154.6/154.6 KB 11.9 MB/s eta 
0:00:00
   Collecting zstandard==0.19.0
 Downloading 
zstandard-0.19.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl 
(2.3 MB)
 2.3/2.3 MB 18.3 MB/s eta 
0:00:00
   Collecting fsspec==2022.10.0
 Downloading fsspec-2022.10.0-py3-none-any.whl (138 kB)
 138.8/138.8 KB 8.7 MB/s eta 
0:00:00
   Collecting click==8.1.3
 Downloading click-8.1.3-py3-none-any.whl (96 kB)
 96.6/96.6 KB 13.8 MB/s eta 
0:00:00
   Collecting s3fs==2022.10.0
 Downloading s3fs-2022.10.0-py3-none-any.whl (27 kB)
   Collecting typing-extensions>=4.1.0
 Downloading typing_extensions-4.4.0-py3-none-any.whl (26 kB)
   Requirement already satisfied: idna<4,>=2.5 in 
/usr/local/lib/python3.9/site-packages (from 
requests==2.28.1->pyiceberg==0.2.0) (3.4)
   Requirement already satisfied: charset-normalizer<3,>=2 in 
/usr/local/lib/python3.9/site-packages (from 
requests==2.28.1->pyiceberg==0.2.0) (2.1.1)
   Requirement already satisfied: urllib3<1.27,>=1.21.1 in 
/usr/local/lib/python3.9/site-packages (from 
requests==2.28.1->pyiceberg==0.2.0) (1.26.13)
   Requirement already satisfied: certifi>=2017.4.17 in 
/usr/local/lib/python3.9/site-packages (from 
requests==2.28.1->pyiceberg==0.2.0) (2022.9.24)
   Collecting pygments<3.0.0,>=2.6.0
 Downloading Pygments-2.13.0-py3-none-any.whl (1.1 MB)
 1.1/1.1 MB 18.6 MB/s eta 
0:00:00
   Collecting commonmark<0.10.0,>=0.9.0
 Downloading commonmark-0.9.1-py2.py3-none-any.whl (51 kB)
 51.1/51.1 KB 11.3 MB/s eta 
0:00:00
   Collecting aiohttp!=4.0.0a0,!=4.0.0a1
 Downloading 
aiohttp-3.8.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.0 
MB)
 1.0/1.0 MB 18.1 MB/s eta 
0:00:00
   Collecting aiobotocore~=2.4.0
 Downloading aiobotocore-2.4.1-py3-none-any.whl (66 kB)
 66.8/66.8 KB 9.0 MB/s eta 
0:00:00
   Collecting botocore<1.27.60,>=1.27.59
 Downloading botocore-1.27.59-py3-none-any.whl (9.1 MB)
 9.1/9.1 MB 9.3 MB/s eta 0:00:00
   Collecting wrapt>=1.10.10
 Downloading 
wrapt-1.14.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (77 kB)
 77.9/77.9 KB 6.4 MB/s eta 
0:00:00
   Collecting aioitertools>=0.5.1
 Downloading aioitertools-0.11.0-py3-none-any.whl (23 kB)
   Collecting aiosignal>=1.1.2
 Downloading aiosignal-1.3.1-py3-none-any.whl (7.6 kB)
   Collecting frozenlist>=1.1.1
 Downloading 
frozenlist-1.3.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl 
(159 kB)
 159.2/159.2 KB 9.0 MB/s eta 
0:00:00
   Collecting async-timeout<5.0,>=4.0.0a3
 Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
   Collecting yarl<2.0,>=1.0
 Downloading 
yarl-1.8.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (261 kB)
 261.1/261.1 KB 15.8 MB/s eta 
0:00:00
   Requirement already satisfied: attrs>=17.3.0 in 
/usr/local/lib/python3.9/site-packages (from 
aiohttp!=4.0.0a0,!=4.0.0a1->s3fs==2022.10.0->pyiceberg==0.2.0) (22.1.0)
   Collecting multidict<7.0,>=4.5
 Downloading 
multidict-6.0.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (116 
kB)
 116.3/116.3 KB 16.2 MB/

issues@iceberg.apache.org

2022-12-05 Thread GitBox


danielcweeks commented on code in PR #6324:
URL: https://github.com/apache/iceberg/pull/6324#discussion_r1040055218


##
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java:
##
@@ -567,8 +569,13 @@ Database convertToDatabase(Namespace namespace, 
Map meta) {
 });
 
 if (database.getOwnerName() == null) {
-  database.setOwnerName(System.getProperty("user.name"));
-  database.setOwnerType(PrincipalType.USER);
+  try {
+
database.setOwnerName(UserGroupInformation.getCurrentUser().getUserName());
+database.setOwnerType(PrincipalType.USER);
+  } catch (IOException e) {
+throw new RuntimeException(

Review Comment:
   Looking through the `UGI` path, it looks like the only likely failure is 
that security (e.g. kerberos) is enabled, but something is misconfigured or the 
user isn't logged in.  It probably makes sense to just throw here, but we 
should use the java built-in `UncheckedIOException` since we have an IO cause 
(Iceberg's `RuntimeIOException` is probably best for when we don't have an IO 
cause).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu merged pull request #6299: Flink: support split discovery throttling for streaming read

2022-12-05 Thread GitBox


stevenzwu merged PR #6299:
URL: https://github.com/apache/iceberg/pull/6299


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on pull request #6299: Flink: support split discovery throttling for streaming read

2022-12-05 Thread GitBox


stevenzwu commented on PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#issuecomment-1338169362

   Thanks @pvary and @hililiwei for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6360: Docs: Update Zorder spark support versions.

2022-12-05 Thread GitBox


RussellSpitzer commented on code in PR #6360:
URL: https://github.com/apache/iceberg/pull/6360#discussion_r1040172058


##
docs/spark-procedures.md:
##
@@ -271,7 +271,7 @@ Iceberg can compact data files in parallel using Spark with 
the `rewriteDataFile
 |---|---|--|-|
 | `table`   | ✔️  | string | Name of the table to update |
 | `strategy`|| string | Name of the strategy - binpack or sort. 
Defaults to binpack strategy |
-| `sort_order`  || string | If Zorder, then comma separated column names 
within zorder() text. Example: zorder(c1,c2,c3). Else, Comma separated 
sort_order_column. Where sort_order_column is a space separated sort order info 
per column (ColumnName SortDirection NullOrder).  SortDirection can be ASC 
or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
+| `sort_order`  || string | If Zorder(supported in Spark 3.2 and above), 
then comma separated column names within zorder() text. Example: 
zorder(c1,c2,c3). Else, Comma separated sort_order_column. Where 
sort_order_column is a space separated sort order info per column (ColumnName 
SortDirection NullOrder).  SortDirection can be ASC or DESC. NullOrder can 
be NULLS FIRST or NULLS LAST |

Review Comment:
   One more comment here, instead of "if Zorder", maybe "For Zorder use a comma 
separated list of columns within zorder(). (Supported in Spark 3.2 and Above) 
Example:  "
   
   "Else, Comma separated sort orders in the format (ColumnName SortDirection 
NullOrder) where  
   Defaults to the table's sort order."
   
   Just reading over the whole doc it is a little confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu opened a new pull request, #6363: Flink: backport split discovery throttling for FLIP-27 source to 1.14…

2022-12-05 Thread GitBox


stevenzwu opened a new pull request, #6363:
URL: https://github.com/apache/iceberg/pull/6363

   … and 1.15


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #5967:
URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040195457


##
docs/flink-getting-started.md:
##
@@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+
+```
+IcebergSource.forRowData()
+.tableLoader(TableLoader.fromCatalog(...))
+.assignerFactory(new SimpleSplitAssignerFactory())
+.streaming(true)
+
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+.startSnapshotId(3821550127947089987L)
+.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", 
"10s")
+.build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to 
current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+.getConfiguration()
+.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
+
+| Read option   | Flink configuration  
 | Table property   | Default   
 | Description  |
+| - | 
- |  | 
-- | 
 |
+| snapshot-id   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the specified snapshot-id. 
|

Review Comment:
   Default should be `null` (not `N/A`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #5967:
URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040196996


##
docs/flink-getting-started.md:
##
@@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+
+```
+IcebergSource.forRowData()
+.tableLoader(TableLoader.fromCatalog(...))
+.assignerFactory(new SimpleSplitAssignerFactory())
+.streaming(true)
+
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+.startSnapshotId(3821550127947089987L)
+.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", 
"10s")
+.build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to 
current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+.getConfiguration()
+.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
+
+| Read option   | Flink configuration  
 | Table property   | Default   
 | Description  |
+| - | 
- |  | 
-- | 
 |
+| snapshot-id   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the specified snapshot-id. 
|
+| case-sensitive| case-sensitive   
 | N/A  | false 
 | If true, match column name in a case sensitive way.  |
+| as-of-timestamp   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the most recent snapshot 
as of the given time in milliseconds. |
+| connector.iceberg.starting-strategy   | 
connector.iceberg.starting-strategy   | N/A  | 
INCREMENTAL_FROM_LATEST_SNAPSHOT   | Starting strategy for streaming execution. 
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the 
incremental mode. The incremental mode starts from the current snapshot 
exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the 
latest snapshot inclusive. If it is an empty map, all future append snapshots 
should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental 
mode from the earliest snapshot inclusive. If it is an empty map, all future 
append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start 
incremental mode from a snapshot with a specific id inclusive. 
INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot 
with a specific timestamp inclusive. If the timestamp is between two snapshots, 
it should start from the snapshot after th
 e timestamp. Just for FIP27 Source. |
+| start-snapshot-timestamp  | N/A  
 | N/A  | N/A   
 | Start to read data from the most recent snapshot as of the given time in 
milliseconds. |
+| start-snapshot-id | N/A  
 | N/A  | N/A   
 | Start to read data from the specified snapshot-id.   |
+| end-snapshot-id   | N/A  
 | N/A  | The latest snapshot id
 | Specifies the end snapshot.  |
+| connector.iceberg.split-size  | connector.iceberg.split-size 
 | read.split.target-size   | Table read.split.target-size  
 | Target size when combining data input splits.|

Review Comment:
   hint option shouldn't need the prefix for consistency, as there is no naming 
collision concern.
   
   Default should be the default value from `ScanContext`, not `Table 
read.split.target-size `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please lo

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #5967:
URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040196996


##
docs/flink-getting-started.md:
##
@@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+
+```
+IcebergSource.forRowData()
+.tableLoader(TableLoader.fromCatalog(...))
+.assignerFactory(new SimpleSplitAssignerFactory())
+.streaming(true)
+
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+.startSnapshotId(3821550127947089987L)
+.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", 
"10s")
+.build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to 
current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+.getConfiguration()
+.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
+
+| Read option   | Flink configuration  
 | Table property   | Default   
 | Description  |
+| - | 
- |  | 
-- | 
 |
+| snapshot-id   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the specified snapshot-id. 
|
+| case-sensitive| case-sensitive   
 | N/A  | false 
 | If true, match column name in a case sensitive way.  |
+| as-of-timestamp   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the most recent snapshot 
as of the given time in milliseconds. |
+| connector.iceberg.starting-strategy   | 
connector.iceberg.starting-strategy   | N/A  | 
INCREMENTAL_FROM_LATEST_SNAPSHOT   | Starting strategy for streaming execution. 
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the 
incremental mode. The incremental mode starts from the current snapshot 
exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the 
latest snapshot inclusive. If it is an empty map, all future append snapshots 
should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental 
mode from the earliest snapshot inclusive. If it is an empty map, all future 
append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start 
incremental mode from a snapshot with a specific id inclusive. 
INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot 
with a specific timestamp inclusive. If the timestamp is between two snapshots, 
it should start from the snapshot after th
 e timestamp. Just for FIP27 Source. |
+| start-snapshot-timestamp  | N/A  
 | N/A  | N/A   
 | Start to read data from the most recent snapshot as of the given time in 
milliseconds. |
+| start-snapshot-id | N/A  
 | N/A  | N/A   
 | Start to read data from the specified snapshot-id.   |
+| end-snapshot-id   | N/A  
 | N/A  | The latest snapshot id
 | Specifies the end snapshot.  |
+| connector.iceberg.split-size  | connector.iceberg.split-size 
 | read.split.target-size   | Table read.split.target-size  
 | Target size when combining data input splits.|

Review Comment:
   hint option shouldn't need the prefix for consistency, as there is no naming 
collision concern.
   
   Default should be the default value from `ScanContext`, not `Table 
read.split.target-size `.
   
   nit: `combining data input splits` -> `combining input splits`.



-- 
This is an automated m

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #5967:
URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040196996


##
docs/flink-getting-started.md:
##
@@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+
+```
+IcebergSource.forRowData()
+.tableLoader(TableLoader.fromCatalog(...))
+.assignerFactory(new SimpleSplitAssignerFactory())
+.streaming(true)
+
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+.startSnapshotId(3821550127947089987L)
+.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", 
"10s")
+.build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to 
current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+.getConfiguration()
+.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
+
+| Read option   | Flink configuration  
 | Table property   | Default   
 | Description  |
+| - | 
- |  | 
-- | 
 |
+| snapshot-id   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the specified snapshot-id. 
|
+| case-sensitive| case-sensitive   
 | N/A  | false 
 | If true, match column name in a case sensitive way.  |
+| as-of-timestamp   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the most recent snapshot 
as of the given time in milliseconds. |
+| connector.iceberg.starting-strategy   | 
connector.iceberg.starting-strategy   | N/A  | 
INCREMENTAL_FROM_LATEST_SNAPSHOT   | Starting strategy for streaming execution. 
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the 
incremental mode. The incremental mode starts from the current snapshot 
exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the 
latest snapshot inclusive. If it is an empty map, all future append snapshots 
should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental 
mode from the earliest snapshot inclusive. If it is an empty map, all future 
append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start 
incremental mode from a snapshot with a specific id inclusive. 
INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot 
with a specific timestamp inclusive. If the timestamp is between two snapshots, 
it should start from the snapshot after th
 e timestamp. Just for FIP27 Source. |
+| start-snapshot-timestamp  | N/A  
 | N/A  | N/A   
 | Start to read data from the most recent snapshot as of the given time in 
milliseconds. |
+| start-snapshot-id | N/A  
 | N/A  | N/A   
 | Start to read data from the specified snapshot-id.   |
+| end-snapshot-id   | N/A  
 | N/A  | The latest snapshot id
 | Specifies the end snapshot.  |
+| connector.iceberg.split-size  | connector.iceberg.split-size 
 | read.split.target-size   | Table read.split.target-size  
 | Target size when combining data input splits.|

Review Comment:
   hint option shouldn't need the prefix for consistency, as there is no naming 
collision concern. Yes, it means that we need to handle the difference in code.
   
   Default should be the default value from `ScanContext`, not `Table 
read.split.target-size `.
   
   nit: `combining data input split

[GitHub] [iceberg] github-actions[bot] closed issue #4822: ParallelIterator is using too much memory

2022-12-05 Thread GitBox


github-actions[bot] closed issue #4822: ParallelIterator is using too much 
memory
URL: https://github.com/apache/iceberg/issues/4822


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #4822: ParallelIterator is using too much memory

2022-12-05 Thread GitBox


github-actions[bot] commented on issue #4822:
URL: https://github.com/apache/iceberg/issues/4822#issuecomment-1338428008

   This issue has been closed because it has not received any activity in the 
last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5967: Flink: Support read options in flink source

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #5967:
URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040205639


##
docs/flink-getting-started.md:
##
@@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+
+```
+IcebergSource.forRowData()
+.tableLoader(TableLoader.fromCatalog(...))
+.assignerFactory(new SimpleSplitAssignerFactory())
+.streaming(true)
+
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+.startSnapshotId(3821550127947089987L)
+.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", 
"10s")
+.build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to 
current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+.getConfiguration()
+.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
+
+| Read option   | Flink configuration  
 | Table property   | Default   
 | Description  |
+| - | 
- |  | 
-- | 
 |
+| snapshot-id   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the specified snapshot-id. 
|
+| case-sensitive| case-sensitive   
 | N/A  | false 
 | If true, match column name in a case sensitive way.  |
+| as-of-timestamp   | N/A  
 | N/A  | N/A   
 | For time travel in batch mode. Read data from the most recent snapshot 
as of the given time in milliseconds. |
+| connector.iceberg.starting-strategy   | 
connector.iceberg.starting-strategy   | N/A  | 
INCREMENTAL_FROM_LATEST_SNAPSHOT   | Starting strategy for streaming execution. 
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the 
incremental mode. The incremental mode starts from the current snapshot 
exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the 
latest snapshot inclusive. If it is an empty map, all future append snapshots 
should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental 
mode from the earliest snapshot inclusive. If it is an empty map, all future 
append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start 
incremental mode from a snapshot with a specific id inclusive. 
INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot 
with a specific timestamp inclusive. If the timestamp is between two snapshots, 
it should start from the snapshot after th
 e timestamp. Just for FIP27 Source. |
+| start-snapshot-timestamp  | N/A  
 | N/A  | N/A   
 | Start to read data from the most recent snapshot as of the given time in 
milliseconds. |
+| start-snapshot-id | N/A  
 | N/A  | N/A   
 | Start to read data from the specified snapshot-id.   |
+| end-snapshot-id   | N/A  
 | N/A  | The latest snapshot id
 | Specifies the end snapshot.  |
+| connector.iceberg.split-size  | connector.iceberg.split-size 
 | read.split.target-size   | Table read.split.target-size  
 | Target size when combining data input splits.|
+| connector.iceberg.split-lookback  | 
connector.iceberg.split-file-open-cost| read.split.planning-lookback | 
Table read.split.planning-lookback | Number of bins to consider when combining 
input splits.  |
+| connector.iceberg.split-file-open-cost| 
connector.iceberg.split-file-open

[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6352: AWS: Fix inconsistent behavior of naming S3 location between read and write operations by allowing only s3 bucket name

2022-12-05 Thread GitBox


amogh-jahagirdar commented on code in PR #6352:
URL: https://github.com/apache/iceberg/pull/6352#discussion_r1040279885


##
aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java:
##
@@ -74,17 +74,14 @@ class S3URI {
 this.scheme = schemeSplit[0];
 
 String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
-ValidationException.check(
-authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: 
%s", location);
-ValidationException.check(
-!authoritySplit[1].trim().isEmpty(), "Invalid S3 URI, path is empty: 
%s", location);
+

Review Comment:
   Never realized this class was package private. So that does give us the 
flexibility to change the behavior without impacting users. I think it makes 
sense then, AmazonS3URI in the AWS SDK has the same semantics for allowing 
bucket by itself as a URI. I do think we should confirm with @danielcweeks on 
if this was intentionally implemented this way in Iceberg. 



##
aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java:
##
@@ -74,17 +74,14 @@ class S3URI {
 this.scheme = schemeSplit[0];
 
 String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
-ValidationException.check(
-authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: 
%s", location);
-ValidationException.check(
-!authoritySplit[1].trim().isEmpty(), "Invalid S3 URI, path is empty: 
%s", location);
+
 this.bucket =
 bucketToAccessPointMapping == null
 ? authoritySplit[0]
 : bucketToAccessPointMapping.getOrDefault(authoritySplit[0], 
authoritySplit[0]);
 
 // Strip query and fragment if they exist
-String path = authoritySplit[1];
+String path = authoritySplit.length > 1 ? authoritySplit[1] : "";

Review Comment:
   Nit: If we go down this route, could we have key be an optional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6352: AWS: Fix inconsistent behavior of naming S3 location between read and write operations by allowing only s3 bucket name

2022-12-05 Thread GitBox


amogh-jahagirdar commented on code in PR #6352:
URL: https://github.com/apache/iceberg/pull/6352#discussion_r1040280304


##
aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java:
##
@@ -74,17 +74,14 @@ class S3URI {
 this.scheme = schemeSplit[0];
 
 String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
-ValidationException.check(
-authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: 
%s", location);
-ValidationException.check(
-!authoritySplit[1].trim().isEmpty(), "Invalid S3 URI, path is empty: 
%s", location);
+
 this.bucket =
 bucketToAccessPointMapping == null
 ? authoritySplit[0]
 : bucketToAccessPointMapping.getOrDefault(authoritySplit[0], 
authoritySplit[0]);
 
 // Strip query and fragment if they exist
-String path = authoritySplit[1];
+String path = authoritySplit.length > 1 ? authoritySplit[1] : "";

Review Comment:
   Nit: If we go down this route, could we have keys/paths be an 
optional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1039841272


##
docs/flink-getting-started.md:
##
@@ -712,9 +712,188 @@ INSERT INTO tableName /*+ 
OPTIONS('upsert-enabled'='true') */
 | compression-strategy   | Table write.orc.compression-strategy   | 
Overrides this table's compression strategy for ORC tables for this write   
   |
 
 
-## Inspecting tables.
+## Inspecting tables
 
-Iceberg does not support inspecting table in flink sql now, we need to use 
[iceberg's Java API](../api) to read iceberg's meta data to get those table 
information.
+To inspect a table's history, snapshots, and other metadata, Iceberg supports 
metadata tables.
+
+Metadata tables are identified by adding the metadata table name after the 
original table name. For example, history for `db.table` is read using 
`db.table$history`.
+
+### History
+
+To show table history:
+
+```sql
+SELECT * FROM prod.db.table$history;
+```
+
+| made_current_at | snapshot_id | parent_id   | 
is_current_ancestor |
+| --- | --- | --- | 
--- |
+| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL| true   
 |
+| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true   
 |
+| 2019-02-09 16:24:30.13  | 296410040247533544  | 5179299526185056830 | false  
 |
+| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true   
 |
+| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true   
 |
+| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true   
 |
+
+{{< hint info >}}
+**This shows a commit that was rolled back.** The example has two snapshots 
with the same parent, and one is *not* an ancestor of the current table state.

Review Comment:
   maybe more explicit?
   
   In this example, snapshot 296410040247533544 and 2999875608062437330 have 
the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was 
rolled back and is *not* an ancestor of the current table state.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040359570


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040360338


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040359570


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] rbalamohan opened a new issue, #6364: Optimise POS reads

2022-12-05 Thread GitBox


rbalamohan opened a new issue, #6364:
URL: https://github.com/apache/iceberg/issues/6364

   ### Apache Iceberg version
   
   0.14.1
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   Currently combinedFileTask can have more than 1 file. Depending on the 
nature of workload, it can even have 30-50+ files in single split. When there 
are 4+ POS files, it takes lot longer time to process "select" queries. This is 
due to the fact,
   that every file needs to process POS file and it leads to read 
amplification. 
   
   Request is to optimise the way POS file reading is done.
   - Optimise parquet reader with cached filestatus and footer
 - Optimise within combinedFileTask in a single task in a single executor. 
This can have more than 1 file in single split. Typically there can 10-50+ 
files depending on the size of the files.
 - For simplicity, let us start with 1 POS file. This POS file can have 
delete information about all the 50+ files in the combined task
   - Currently, for every file it opens, it needs "delete row positions". 
So it invokes "DeleteFilter::deletedRowPositions". This opens the POS file, 
reads the footer and reads the snippet for specific file path.
   - Above step happens for all the 50+ files in sequential order.
   - Internally, it opens and reads the footer information 50+ times which 
is not needed.
   - Need a lightweight parquet reader, which can accept readerConfs etc 
and take up footer information as argument. Basically cache footer details, 
file status details to reduce turn around with object stores.
 - Otherwise pass the POS reader during data reading, such that it 
doesn't need to reopen and read the footers again.
- Optimise on reading POS
  - Though path is dictionary encoded, it ends up materializing the path 
again and again. Need a way to optimise this to reduce CPU burn when reading 
POS files.
  - Covered in https://github.com/apache/iceberg/issues/5863
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


stevenzwu commented on PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#issuecomment-1338683010

   @hililiwei we should add comprehensive unit test for `StructRowData`. 
   
   I have some internal DataGenerators for unit test code with very 
comprehensive coverage all field types (including complex nested types). Maybe 
I will submit a separate PR for that, which will cover Flink `RowData` and 
Iceberg `GenericRecord`. You can expand it with support for Iceberg 
`StructLike`. With that, we can write unit test/assertions compare the actual 
and expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040393766


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##
@@ -295,6 +299,161 @@ private static void assertEquals(
 }
   }
 
+  public static void assertEqualsSafe(
+  Schema schema, List recs, List rows) {

Review Comment:
   why do we need the helper methods with Avro generic record?



##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanc

[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #6313: Flink: use correct metric config for position deletes

2022-12-05 Thread GitBox


chenjunjiedada commented on code in PR #6313:
URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040420644


##
core/src/main/java/org/apache/iceberg/MetricsConfig.java:
##
@@ -107,6 +107,30 @@ public static MetricsConfig forPositionDelete(Table table) 
{
 return new MetricsConfig(columnModes.build(), defaultMode);
   }
 
+  /**
+   * Creates a metrics config for a position delete file.
+   *
+   * @param props table configuration
+   */
+  public static MetricsConfig forPositionDelete(Map props) {

Review Comment:
   @stevenzwu I recheck the metric config, the schema and sort order are for 
the data row. Change to use `forTable` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #6313: Flink: use correct metric config for position deletes

2022-12-05 Thread GitBox


chenjunjiedada commented on code in PR #6313:
URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040421576


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java:
##
@@ -160,7 +184,8 @@ public EqualityDeleteWriter newEqDeleteWriter(
 eqDeleteRowSchema,
 "Equality delete row schema shouldn't be null when creating 
equality-delete writer");
 
-MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
+MetricsConfig metricsConfig =
+table != null ? MetricsConfig.forTable(table) : 
MetricsConfig.fromProperties(props);

Review Comment:
   The data row in equality delete should share the same metrics config as the 
table, so I change this as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6313: Flink: use correct metric config for position deletes

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6313:
URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040423889


##
core/src/main/java/org/apache/iceberg/MetricsConfig.java:
##
@@ -107,6 +107,30 @@ public static MetricsConfig forPositionDelete(Table table) 
{
 return new MetricsConfig(columnModes.build(), defaultMode);
   }
 
+  /**
+   * Creates a metrics config for a position delete file.
+   *
+   * @param props table configuration
+   */
+  public static MetricsConfig forPositionDelete(Map props) {

Review Comment:
   @chenjunjiedada hmm. I think we should revert the last commit. if we are 
going to pass in the `Table` object, it should be non-null. Then we can 
consistently use `forTable` everywhere.
   
   I would also be open with the `forProperties` approach, as you were saying 
that schema and sort order are for data rows/files.  I had some reservations 
because `MetricConfig` deprecated `forProperties` for data rows/files. It is a 
little weird to introduce it back for position delete. Flink can use 
`forPositionDelete(Table table)` API by passing a valid `Table` object to 
`FlinkAppenderFactory`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6313: Flink: use correct metric config for position deletes

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6313:
URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040425132


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java:
##
@@ -160,7 +184,8 @@ public EqualityDeleteWriter newEqDeleteWriter(
 eqDeleteRowSchema,
 "Equality delete row schema shouldn't be null when creating 
equality-delete writer");
 
-MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
+MetricsConfig metricsConfig =
+table != null ? MetricsConfig.forTable(table) : 
MetricsConfig.fromProperties(props);

Review Comment:
   this actually makes a stronger case that we should pass in a valid `Table` 
object to the `FlinkAppenderFactory` as I mentioned in the other comment. we 
should also use `forTable` for position delete although it doesn't need schema 
or SortOrder as you said.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6313: Flink: use correct metric config for position deletes

2022-12-05 Thread GitBox


stevenzwu commented on code in PR #6313:
URL: https://github.com/apache/iceberg/pull/6313#discussion_r1040423889


##
core/src/main/java/org/apache/iceberg/MetricsConfig.java:
##
@@ -107,6 +107,30 @@ public static MetricsConfig forPositionDelete(Table table) 
{
 return new MetricsConfig(columnModes.build(), defaultMode);
   }
 
+  /**
+   * Creates a metrics config for a position delete file.
+   *
+   * @param props table configuration
+   */
+  public static MetricsConfig forPositionDelete(Map props) {

Review Comment:
   @chenjunjiedada hmm. I think we should revert the last commit. if we are 
going to pass in the `Table` object, it should be non-null. Then we can 
consistently use `forTable` everywhere.
   
   I would also be open with the `forProperties` approach, as you were saying 
that schema and sort order are for data rows/files.  I had some reservations 
because `MetricConfig` deprecated `forProperties` for data rows/files. It is a 
little weird to introduce it back for position delete. Flink can use 
`forPositionDelete(Table table)` API by passing a valid `SerializableTable` 
object to `FlinkAppenderFactory`. With `Table` object, we can remove the 
`Schema` and `PartitionSpec` from the current `FlinkAppenderFactory` constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho opened a new pull request, #6365: Core: Add position deletes metadata table

2022-12-05 Thread GitBox


szehon-ho opened a new pull request, #6365:
URL: https://github.com/apache/iceberg/pull/6365

   This breaks up the pr https://github.com/apache/iceberg/pull/4812 , and is 
just the part to add the table PositionDeletesTable.
   
   It is based on @aokolnychyi 's newly-added BatchScan interface, so the scan 
is free to not return FileScanTask.  It returns a custom ScanTask thatl scan 
DeleteFiles rather than DataFiles.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on issue #6364: Optimise POS reads

2022-12-05 Thread GitBox


RussellSpitzer commented on issue #6364:
URL: https://github.com/apache/iceberg/issues/6364#issuecomment-1338783816

   What is a POS file? I’m not familiar with the acronym Sent from my iPhoneOn 
Dec 5, 2022, at 9:00 PM, rbalamohan ***@***.***> wrote:
   Apache Iceberg version
   0.14.1
   Query engine
   Spark
   Please describe the bug 🐞
   Currently combinedFileTask can have more than 1 file. Depending on the 
nature of workload, it can even have 30-50+ files in single split. When there 
are 4+ POS files, it takes lot longer time to process "select" queries. This is 
due to the fact,
   that every file needs to process POS file and it leads to read amplification.
   Request is to optimise the way POS file reading is done.
   
   Optimise parquet reader with cached filestatus and footer
   
   Optimise within combinedFileTask in a single task in a single executor. This 
can have more than 1 file in single split. Typically there can 10-50+ files 
depending on the size of the files.
   For simplicity, let us start with 1 POS file. This POS file can have delete 
information about all the 50+ files in the combined task
   
   Currently, for every file it opens, it needs "delete row positions". So it 
invokes "DeleteFilter::deletedRowPositions". This opens the POS file, reads the 
footer and reads the snippet for specific file path.
   Above step happens for all the 50+ files in sequential order.
   Internally, it opens and reads the footer information 50+ times which is not 
needed.
   Need a lightweight parquet reader, which can accept readerConfs etc and take 
up footer information as argument. Basically cache footer details, file status 
details to reduce turn around with object stores.
   
   Otherwise pass the POS reader during data reading, such that it doesn't need 
to reopen and read the footers again.
   
   
   
   
   
   
   Optimise on reading POS
   
   Though path is dictionary encoded, it ends up materializing the path again 
and again. Need a way to optimise this to reduce CPU burn when reading POS 
files.
   Covered in #5863
   
   
   
   
   —Reply to this email directly, view it on GitHub, or unsubscribe.You are 
receiving this because you are subscribed to this thread.Message ID: 
***@***.***>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040547304


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##
@@ -295,6 +299,161 @@ private static void assertEquals(
 }
   }
 
+  public static void assertEqualsSafe(
+  Schema schema, List recs, List rows) {

Review Comment:
   The metadata files are stored in avro format, and when we read them 
directly, we get the Avro generic record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040554968


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] nastra commented on pull request #6355: Build: Bump org.eclipse.jgit from 5.13.1.202206130422-r to 6.4.0.202211300538-r

2022-12-05 Thread GitBox


nastra commented on PR #6355:
URL: https://github.com/apache/iceberg/pull/6355#issuecomment-1338859690

   we can't upgrade because the latest version that supports JDK8 is 
5.13.1.202206130422-r


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra closed pull request #6355: Build: Bump org.eclipse.jgit from 5.13.1.202206130422-r to 6.4.0.202211300538-r

2022-12-05 Thread GitBox


nastra closed pull request #6355: Build: Bump org.eclipse.jgit from 
5.13.1.202206130422-r to 6.4.0.202211300538-r
URL: https://github.com/apache/iceberg/pull/6355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dependabot[bot] commented on pull request #6355: Build: Bump org.eclipse.jgit from 5.13.1.202206130422-r to 6.4.0.202211300538-r

2022-12-05 Thread GitBox


dependabot[bot] commented on PR #6355:
URL: https://github.com/apache/iceberg/pull/6355#issuecomment-1338859714

   OK, I won't notify you again about this release, but will get in touch when 
a new version is available. If you'd rather skip all updates until the next 
major or minor version, let me know by commenting `@dependabot ignore this 
major version` or `@dependabot ignore this minor version`. You can also ignore 
all major, minor, or patch releases for a dependency by adding an [`ignore` 
condition](https://docs.github.com/en/code-security/supply-chain-security/configuration-options-for-dependency-updates#ignore)
 with the desired `update_types` to your config file.
   
   If you change your mind, just re-open this PR and I'll resolve any conflicts 
on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jaehyeon-kim commented on issue #4977: Support Kafka Connect within Iceberg

2022-12-05 Thread GitBox


jaehyeon-kim commented on issue #4977:
URL: https://github.com/apache/iceberg/issues/4977#issuecomment-1338869374

   +1 for Kafka connect. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040596892


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+
+public class StructRowData implements RowData {
+  private final Types.StructType type;
+  private RowKind kind;
+  private StructLike struct;
+
+  public StructRowData(Types.StructType type) {
+this(type, RowKind.INSERT);
+  }
+
+  public StructRowData(Types.StructType type, RowKind kind) {
+this(type, null, kind);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct) {
+this(type, struct, RowKind.INSERT);
+  }
+
+  private StructRowData(Types.StructType type, StructLike struct, RowKind 
kind) {
+this.type = type;
+this.struct = struct;
+this.kind = kind;
+  }
+
+  public StructRowData setStruct(StructLike newStruct) {
+this.struct = newStruct;
+return this;
+  }
+
+  @Override
+  public int getArity() {
+return struct.size();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return kind;
+  }
+
+  @Override
+  public void setRowKind(RowKind newKind) {
+Preconditions.checkNotNull(newKind, "kind can not be null");
+this.kind = newKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+return struct.get(pos, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+return struct.get(pos, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+return (byte) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public short getShort(int pos) {
+return (short) (int) struct.get(pos, Integer.class);
+  }
+
+  @Override
+  public int getInt(int pos) {
+Object integer = struct.get(pos, Object.class);
+
+if (integer instanceof Integer) {
+  return (int) integer;
+} else if (integer instanceof LocalDate) {
+  return (int) ((LocalDate) integer).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for int field. Type name: " + 
integer.getClass().getName());
+}
+  }
+
+  @Override
+  public long getLong(int pos) {
+Object longVal = struct.get(pos, Object.class);
+
+if (longVal instanceof Long) {
+  return (long) longVal;
+} else if (longVal instanceof OffsetDateTime) {
+  return Duration.between(Instant.EPOCH, (OffsetDateTime) 
longVal).toNanos() / 1000;
+} else if (longVal instanceof LocalDate) {
+  return ((LocalDate) longVal).toEpochDay();
+} else {
+  throw new IllegalStateException(
+  "Unknown type for long field. Type name: " + 
longVal.getClass().getName());
+}
+  }
+
+  @Override
+  public float getFloat(int pos) {
+return struct.get(pos, Float.class);
+  }
+
+  @Override
+ 

[GitHub] [iceberg] zstraw commented on issue #4550: the snapshot file is lost when write iceberg using flink Failed to open input stream for file File does not exist

2022-12-05 Thread GitBox


zstraw commented on issue #4550:
URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1338908066

   After deeping into iceberg code and the log, I can reproduce it in debugging 
locally.
   
   The scenario may happens in the process of Flink cancelling.
   1. IcebergFileCommitter is going to commit file. In the step of **rename** 
metadata.json(org.apache.iceberg.hadoop.HadoopTableOperations#renameToFinal), 
org.apache.hadoop.ipc.Client.call encounters **InterruptedIOException**. I 
suspect it comes from Flink task cancelling. On the other hand, **Hdfs has 
renamed the metada.json file sucessfully**.
   2. After rename fails, it's supposed to retry. But the thread encounters 
InterruptedException in 
sleeping(org.apache.iceberg.util.Tasks#runTaskWithRetry). Then it will throw a 
RuntimeException. And the version-hint will not be updated.
   3. The RuntimeException leads to **rollback** in 
org.apache.iceberg.BaseTransaction(#cleanUpOnCommitFailure), which will delete 
manifest list (snap-XXX).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#issuecomment-1338913731

   > @hililiwei we should add comprehensive unit test for `StructRowData`.
   > 
   > I have some internal DataGenerators for unit test code with very 
comprehensive coverage all field types (including complex nested types). Maybe 
I will submit a separate PR for that, which will cover Flink `RowData` and 
Iceberg `GenericRecord`. You can expand it with support for Iceberg 
`StructLike`. With that, we can write unit test/assertions compare the actual 
and expected.
   
   That would be great. I'm also thinking of adding UT for it. So i'll do it 
based on your code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table

2022-12-05 Thread GitBox


hililiwei commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040606681


##
docs/flink-getting-started.md:
##
@@ -712,9 +712,188 @@ INSERT INTO tableName /*+ 
OPTIONS('upsert-enabled'='true') */
 | compression-strategy   | Table write.orc.compression-strategy   | 
Overrides this table's compression strategy for ORC tables for this write   
   |
 
 
-## Inspecting tables.
+## Inspecting tables
 
-Iceberg does not support inspecting table in flink sql now, we need to use 
[iceberg's Java API](../api) to read iceberg's meta data to get those table 
information.
+To inspect a table's history, snapshots, and other metadata, Iceberg supports 
metadata tables.
+
+Metadata tables are identified by adding the metadata table name after the 
original table name. For example, history for `db.table` is read using 
`db.table$history`.
+
+### History
+
+To show table history:
+
+```sql
+SELECT * FROM prod.db.table$history;
+```
+
+| made_current_at | snapshot_id | parent_id   | 
is_current_ancestor |
+| --- | --- | --- | 
--- |
+| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL| true   
 |
+| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true   
 |
+| 2019-02-09 16:24:30.13  | 296410040247533544  | 5179299526185056830 | false  
 |
+| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true   
 |
+| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true   
 |
+| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true   
 |
+
+{{< hint info >}}
+**This shows a commit that was rolled back.** The example has two snapshots 
with the same parent, and one is *not* an ancestor of the current table state.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org