[GitHub] [iceberg] nastra commented on issue #6415: Vectorized Read Issue

2022-12-14 Thread GitBox


nastra commented on issue #6415:
URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1350603108

   See 
https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java#L149-L210
 for some background why the issue currently exists. 
   I have opened https://github.com/apache/iceberg/pull/3024 a while ago, but 
it didn't get attention


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on issue #6415: Vectorized Read Issue

2022-12-14 Thread GitBox


nastra commented on issue #6415:
URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1350604548

   @rdblue thoughts on getting the above issue fixed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra opened a new issue, #6423: Run JMH Benchmarks weekly / Visualize benchmark results

2022-12-14 Thread GitBox


nastra opened a new issue, #6423:
URL: https://github.com/apache/iceberg/issues/6423

   ### Feature Request / Improvement
   
   Currently we have a way to run JMH benchmarks on forks. The goal here is 
that JMH Benchmarks are executed on a weekly (or any other cadence) via a 
GitHub action.
   
   Additionaly, it would be great if we could visualize the results and 
immediately see if there's a x % deviation from previous benchmark runs.
   I've found https://github.com/jzillmann/gradle-jmh-report which might help 
with that part
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid opened a new issue, #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid opened a new issue, #6424:
URL: https://github.com/apache/iceberg/issues/6424

   ### Apache Iceberg version
   
   main (development)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   The size estimation formula used for non partition cols as seen in 
   ContentScanTask is presently as 
   default long estimatedRowsCount() {
   double scannedFileFraction = ((double) length()) / 
file().fileSizeInBytes();
   return (long) (scannedFileFraction * file().recordCount());
 }
   
   IMO it should be
   (file().fileSizeInBytes()   * file().recordCount()) / length() 
   
   We are estimating the full row count by scanning part of the file, and the 
rows contained in it.
   
   the current formula is wroing , because scannedFileFraction is bound to be 
<= 1
   so full row count has to be >= file().recordCount()
   but if full row count = scannedFileFraction * file().recordCount() implies 
that 
   full row count <= file().recordCount()
   which is incorrect.
   
   I have bugtest which shows that because of this, inefficient broadcast 
hashjoins are getting created.
   Will create a PR & bug test tomorrow.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] pvary commented on a diff in pull request #3337: Fixed issue #3336: Best efforts to release hive table lock

2022-12-14 Thread GitBox


pvary commented on code in PR #3337:
URL: https://github.com/apache/iceberg/pull/3337#discussion_r1048287741


##
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##
@@ -499,11 +499,21 @@ private void unlock(Optional lockId) {
   }
 
   @VisibleForTesting
-  void doUnlock(long lockId) throws TException, InterruptedException {
-metaClients.run(client -> {
-  client.unlock(lockId);
-  return null;
-});
+  void doUnlock(long lockId) throws TException {
+boolean released = false;
+while (!released) {
+  try {
+released = metaClients.run(client -> {

Review Comment:
   @dmgcodevil: I have missed that. Thanks.
   
   @fengguangyuan: Still a bit concerned about the possible inifinite loop.
   
   Could we just do a single retry instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nazq commented on issue #6415: Vectorized Read Issue

2022-12-14 Thread GitBox


nazq commented on issue #6415:
URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351547529

   Happy to create a PR @rdblue , just want to make sure we're on the right 
track here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rubenvdg commented on issue #6361: Python: Ignore home folder when running tests

2022-12-14 Thread GitBox


rubenvdg commented on issue #6361:
URL: https://github.com/apache/iceberg/issues/6361#issuecomment-1351585780

   Happy to take this one on, if nobody is working on it atm. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


RussellSpitzer commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351621767

   The current code is slightly different than this, 
   
   
https://github.com/apache/iceberg/blob/33217abf7f88c6c22a8c43b320f9de48de998b94/api/src/main/java/org/apache/iceberg/ContentScanTask.java#L65-L70
   
   but i'm not sure I follow your math. 
   
   Why would Full Row Count have to be greater than the record count for a 
split? If we scan only part of a file it should only be a portion of the rows 
in the file? When we sum over all of our partial scans we should get the full 
amount (if the full file is read)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on issue #6415: Vectorized Read Issue

2022-12-14 Thread GitBox


nastra commented on issue #6415:
URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351641875

   @nazq just FYI, there's already #3024 that addresses this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nazq commented on issue #6415: Vectorized Read Issue

2022-12-14 Thread GitBox


nazq commented on issue #6415:
URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351656081

   Excellent. Thanks for the pointer @nastra 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on issue #6420: Iceberg Materialized View Spec

2022-12-14 Thread GitBox


nastra commented on issue #6420:
URL: https://github.com/apache/iceberg/issues/6420#issuecomment-1351656857

   @JanKaul I think it would be great to get this out to the DEV mailing list 
to get more attention and input from people


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351671855

   @RussellSpitzer  Right, I missed the modifiucation of " - splitOffset".
   
   Though the bug, which I think is in formula, still remains.
   
   My reasoning is as follows:
   the function estimatedRowCounts has to estimate the total row count of a 
split/file (or a single file) by analyzing a fraction of split (file) .
   which means that 
   total row count of a split/file >= scanned fraction row count( which is what 
we call record count)
   
   now if total row count of a split/file  = (scannedFileFraction * 
file().recordCount())
   and scanned fraction is <= 1
   this would result in total row count <= fraction's record count.
   
   the change i proposed is based on this ratio/proportion
   
when  scanned file/split size is length()   
 rows is file().recordCount()
   so when total size of file/split is (file().fileSizeInBytes() - 
splitOffset).the total count X = ?
   
   X = ( file().recordCount()   *  (file().fileSizeInBytes() - splitOffset) 
) / length()  
   
   do u think my understanding is correct , of the objective of the function?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu merged pull request #6313: Flink: use correct metric config for position deletes

2022-12-14 Thread GitBox


stevenzwu merged PR #6313:
URL: https://github.com/apache/iceberg/pull/6313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on pull request #6313: Flink: use correct metric config for position deletes

2022-12-14 Thread GitBox


stevenzwu commented on PR #6313:
URL: https://github.com/apache/iceberg/pull/6313#issuecomment-1351737859

   thanks @chenjunjiedada for the contribution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


RussellSpitzer commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351768402

   > now if total row count of a split/file = (scannedFileFraction * 
file().recordCount())
   
   This is I think the confusion, we are attempting to determine how many rows 
are in this split specifically because we are summing over all splits. 
   
   
https://github.com/apache/iceberg/blob/7fd9ded0a119c050746d765bd90c59fef93506b1/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java#L143
   
A file may be turned into multiple FileScanTasks so we can only count the 
number of rows contributed by the tasks we are currently looking at. Let's say 
we do a scan which only ends up touching FileA but File A is divided into 
multiple  FileScanTasks (or now Content Scan Tasks) for read parallelism A1, 
A2, and A3
   
   If they have lengths 2/5 A , 2/5 A and 1/5 A then we do the following math
   Sum over all tasks (2/5 A * Total A Rows + 2/5 A * total A Rows + 1/5 A * 
total A Rows) = Total A Rows
   This should be the total amount of rows in the scan
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6378: Spark: Extend Timeout During Partial Progress Rewrites

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6378:
URL: https://github.com/apache/iceberg/pull/6378#discussion_r1048728860


##
core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java:
##
@@ -225,25 +225,40 @@ public void close() {
   LOG.info("Closing commit service for {}", table);
   committerService.shutdown();
 
+  boolean timeout = false;
+  int waitTime;
   try {
 // All rewrites have completed and all new files have been created, we 
are now waiting for
 // the commit
-// pool to finish doing it's commits to Iceberg State. In the case of 
partial progress this
+// pool to finish doing its commits to Iceberg State. In the case of 
partial progress this
 // should
 // have been occurring simultaneously with rewrites, if not there 
should be only a single
 // commit operation.
-// In either case this should take much less than 10 minutes to 
actually complete.
-if (!committerService.awaitTermination(10, TimeUnit.MINUTES)) {
+// We will wait 10 minutes plus 5 more minutes for each commit left to 
perform due to the
+// time required for writing manifests
+waitTime = 10 + (completedRewrites.size() / rewritesPerCommit) * 5;

Review Comment:
   I was just thinking that 5 minutes is plenty of time to write a manifest 
file. In our use case the biggest problem is the amount of time it takes to 
write the manifests. Rewriting the Json portion is fast.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1048731625


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
 return new SparkChangelogScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, 
startTimestamp);
+Preconditions.checkArgument(

Review Comment:
   I think you are correct @flyrain , I only really would want the error for 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1048750874


##
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##
@@ -137,6 +138,64 @@ public void testOverwrites() {
 changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+sql(
+"CREATE TABLE %s (id INT, data STRING) "
++ "USING iceberg "
++ "PARTITIONED BY (data) "
++ "TBLPROPERTIES ( "
++ " '%s' = '%d' "
++ ")",
+tableName, FORMAT_VERSION, formatVersion);
+
+sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+Table table = validationCatalog.loadTable(tableIdent);
+
+Snapshot snap2 = table.currentSnapshot();
+
+sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+table.refresh();
+
+Snapshot snap3 = table.currentSnapshot();
+
+assertEquals(
+"Should have expected changed rows only from snapshot 3",
+ImmutableList.of(

Review Comment:
   Not a big deal if we aren't using this a lot. If we end up making more tests 
like this in the future though I think it would be a good utility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r104878


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};

Review Comment:
   This seems like a very expensive way to pass back 2 nulls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.

[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048780674


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};

Review Comment:
   I would probably just return "null" and do an "if null { currentRow = null , 
cachedRow = null }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional comman

[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048789305


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};
+} else {
+  deletedRow.update(changeTypeIndex, UPDATE_BEFORE);
+  insertedRow.update(changeTypeIndex, UPDATE_AFTER);
+
+  return new Row[] {
+RowFactory.create(deletedRow.values()), 
RowFactory.create(insertedRow.values())

Review Comment:
   Why are we making new rows here? Can we not just use our GenericInternalRows 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queri

[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048790825


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};
+} else {
+  deletedRow.update(changeTypeIndex, UPDATE_BEFORE);
+  insertedRow.update(changeTypeIndex, UPDATE_AFTER);
+
+  return new Row[] {
+RowFactory.create(deletedRow.values()), 
RowFactory.create(insertedRow.values())
+  };
+}
+  }
+
+  private boolean isCarryoverRecord(GenericInternalRow deletedRow, 
GenericInternalRow insertedRow) {
+// set the change_type to the same value
+deletedRow.update(changeTypeIndex, "");

Review Comment:
   I would just define an equals method that ignores ChangeTypeIndex



-- 
This is an automated message from t

[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048792877


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};
+} else {
+  deletedRow.update(changeTypeIndex, UPDATE_BEFORE);
+  insertedRow.update(changeTypeIndex, UPDATE_AFTER);
+
+  return new Row[] {
+RowFactory.create(deletedRow.values()), 
RowFactory.create(insertedRow.values())
+  };
+}
+  }
+
+  private boolean isCarryoverRecord(GenericInternalRow deletedRow, 
GenericInternalRow insertedRow) {
+// set the change_type to the same value
+deletedRow.update(changeTypeIndex, "");
+insertedRow.update(changeTypeIndex, "");
+return deletedRow.equals(insertedRow);
+  }
+
+  private boolean updated(Row 

[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048795379


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+  private final boolean markUpdatedRows;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator,
+  int changeTypeIndex,
+  List partitionIdx,
+  boolean markUpdatedRows) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+this.markUpdatedRows = markUpdatedRows;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is a processed cached row, return it directly
+if (cachedRow != null
+&& !cachedRow.getString(changeTypeIndex).equals(DELETE)
+&& !cachedRow.getString(changeTypeIndex).equals(INSERT)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+
+  if (withinPartition(currentRow, nextRow)
+  && currentRow.getString(changeTypeIndex).equals(DELETE)
+  && nextRow.getString(changeTypeIndex).equals(INSERT)) {
+
+GenericInternalRow deletedRow =
+new GenericInternalRow(((GenericRowWithSchema) 
currentRow).values());
+GenericInternalRow insertedRow = new 
GenericInternalRow(nextRow.values());
+
+// set the change_type to the same value
+deletedRow.update(changeTypeIndex, "");
+insertedRow.update(changeTypeIndex, "");
+
+if (deletedRow.equals(insertedRow)) {
+  // remove two carry-over rows
+  currentRow = null;
+  this.cachedRow = null;
+} else if (markUpdatedRows) {
+  // mark the updated rows
+  deletedRow.update(changeTypeIndex, 
ChangelogOperation.UPDATE_BEFORE.name());
+  currentRow = RowFactory.create(deletedRow.values());
+
+  insertedRow.update(changeTypeIndex, 
ChangelogOperation.UPDATE_AFTER.name());
+  this.cachedRow = RowFactory.create(insertedRow.values());
+} else {
+  // recover the values of change type
+  deletedRow.update(changeTypeIndex, DELETE);
+  insertedRow.update(changeTypeIndex, INSERT);
+  this.cachedRow = nextRow;
+}
+
+  } else {
+this.cachedRow = nextRow;
+  }
+}
+
+return currentRow;
+  }
+
+  private Row currentRow() {
+if (cachedRow != null) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+} else {
+  return rowIterator.next();
+}
+  }
+
+  private boolean withinPartition(Row currentRow, Row nextRow) {

Review Comment:
   I think something like "sameLogicalRow(Row currentRow, Row nextRow)"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail:

[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351885991

   Right... That I agree. May be along with split offset ( which is the start 
of split) , we need the end of split..
   
   But still, pls allow me to describe this simplified case , where the split 
is same as the file being considered, so that split offset is 0. 
   and assume split size = file.size.
   
   Now as per current formula 
   splitOffset = 0
   so double scannedFileFraction = ((double) **length()**) / 
(file().fileSizeInBytes() ); 
   here  ** length() ** is the amount of bytes scanned ( only partially read)
   and ** file().recordCount() ** is the number of records read in that partial 
scan...
   right ?
   now scannedFileFraction  is < 1 ( assume that total file is not scanned) .
   say scannedFileFraction = 0.5
   and numRowsInScan = recordCount = 50
   
   So ideally total estimated row count in that file should be 100.
   
   But with the formula we will get total row count = 0.5 * 50  = 25
   
   But with corrected formula it will be = 50 / 0.5  = 100
   
   because in corrected formula , the fraction is = (file().fileSizeInBytes()) 
/ length = 2 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6405: API: Add Aggregate expression evaluation

2022-12-14 Thread GitBox


rdblue commented on code in PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#discussion_r1048806645


##
api/src/main/java/org/apache/iceberg/expressions/CountStar.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.expressions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.StructLike;
+
+public class CountStar extends CountAggregate {
+  protected CountStar(BoundTerm term) {
+super(Operation.COUNT_STAR, term);
+  }
+
+  @Override
+  protected Long countFor(StructLike row) {
+return 1L;
+  }
+
+  @Override
+  protected Long countFor(DataFile file) {
+long count = file.recordCount();
+if (count < 0) {
+  return null;
+}

Review Comment:
   Some imported Avro files had incorrect metadata several versions ago. I 
don't think it is widespread, but it is good to handle it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


RussellSpitzer commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351901052

   > here length() is the amount of bytes scanned ( only partially read)
   
   
https://github.com/apache/iceberg/blob/33217abf7f88c6c22a8c43b320f9de48de998b94/api/src/main/java/org/apache/iceberg/ContentScanTask.java#L48-L53
   
   Your hypothetical has a contradiction
   
   `length = splitSize`
   So
   `if (splitSize == fileSizeInBytes) then length == fileSizeInBytes`
   
   `If length == fileSizeInBytes/2 then splitSize = fileSizeInBytes/2` 
   Therefor there must be one other task of length
   `fileSizeInBytes/2` otherwise we wouldn't be reading the whole file. So 
again the fraction here is correct at 50% for the given task. The other Task 
will count the other 50%


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048813554


##
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.iceberg.ChangelogOperation.DELETE;
+import static org.apache.iceberg.ChangelogOperation.INSERT;
+import static org.apache.iceberg.ChangelogOperation.UPDATE_AFTER;
+import static org.apache.iceberg.ChangelogOperation.UPDATE_BEFORE;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.junit.Test;
+
+public class TestChangelogIterator {
+  private final List rows =
+  Lists.newArrayList(
+  new GenericRowWithSchema(new Object[] {1, "a", "data", "DELETE"}, 
null),
+  new GenericRowWithSchema(new Object[] {1, "a", "new_data", 
"INSERT"}, null),
+  // next two rows belong to different partitions
+  new GenericRowWithSchema(new Object[] {2, "b", "data", "DELETE"}, 
null),
+  new GenericRowWithSchema(new Object[] {3, "c", "data", "INSERT"}, 
null),
+  new GenericRowWithSchema(new Object[] {4, "d", "data", "DELETE"}, 
null),
+  new GenericRowWithSchema(new Object[] {4, "d", "data", "INSERT"}, 
null));
+
+  private final int changeTypeIndex = 3;
+  private final List partitionIdx = Lists.newArrayList(0, 1);
+
+  @Test
+  public void testUpdatedRows() {

Review Comment:
   I think we need more exhaustive tests here. I would probably permute our 4 
types of records here.
   
   Update - CarryOver - Insert - Delete 
   
   We should make sure this works in any permutation of ordering.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain commented on pull request #6350: Query changelog table with a timestamp range

2022-12-14 Thread GitBox


flyrain commented on PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#issuecomment-1351909259

   Thanks @RussellSpitzer. Hi @szehon-ho @hililiwei , please let me know if you 
have any comments. Thanks1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dmgcodevil commented on a diff in pull request #3337: Fixed issue #3336: Best efforts to release hive table lock

2022-12-14 Thread GitBox


dmgcodevil commented on code in PR #3337:
URL: https://github.com/apache/iceberg/pull/3337#discussion_r1048815939


##
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##
@@ -499,11 +499,21 @@ private void unlock(Optional lockId) {
   }
 
   @VisibleForTesting
-  void doUnlock(long lockId) throws TException, InterruptedException {
-metaClients.run(client -> {
-  client.unlock(lockId);
-  return null;
-});
+  void doUnlock(long lockId) throws TException {
+boolean released = false;
+while (!released) {
+  try {
+released = metaClients.run(client -> {

Review Comment:
   there is no retry. also:
   
   ```
   catch (RuntimeException e) {
   throw e;
 }
   ```
   
   is redundant 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351924900

   Right.. I was also thinking that this is where I have a misunderstanding or 
bug...
   The question is :
   where the recordCount represents the scanned fraction row count, or the 
total row count of the split/file.
   
   I will look into the code, but as per me., the recordCount has to be partial 
scanned count.
   This is for 2 reasons:
   1)  The estimated Row count function needs to return the total row count in 
the split/file. If that was available , then calculations should not even be 
needed.
   2) For a split which is partial on a single file, or if it spawns multiple 
files, the estimated row count of total split will need to be calculated.  and 
for that the recordCount should be something which refers to partial scanned 
row count.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


RussellSpitzer commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351938390

   Record count does not represent the scanned fraction. I linked you to the 
code, it's a representation of a row in a manifestFile which is a the metadata 
for the entire file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351939955

   I see. let me see if I can explain what I mean by test...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] asheeshgarg commented on issue #6415: Vectorized Read Issue

2022-12-14 Thread GitBox


asheeshgarg commented on issue #6415:
URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351998029

   @nastra thanks for the references seems to be the case. @rdblue this seems 
to be really the case with lot of datasets. Do we have any time line when 
https://github.com/apache/iceberg/pull/3024 will be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352025232

   @RussellSpitzer : I see what you are saying about record count corresponding 
to total file size.
   Let me look into what is causing something wrong in my test for join


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352040671

   @RussellSpitzer : 
   apologies for bugging , I was hoping one more clarification on this aspect:
   long splitOffset = (file().splitOffsets() != null) ? 
file().splitOffsets().get(0) : 0L;
double scannedFileFraction = ((double) length()) / 
(file().fileSizeInBytes() - splitOffset);
return (long) (scannedFileFraction * file().recordCount());
   
   Given that  file.recordCount() corresponds to file().fileSizeInBytes()
   and length() is the length of the split ( if my understanding is correct)
   then the subtraction of splitOffset is not needed as per my understanding...
   why is splitOffset being subtracted from fileSize...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


RussellSpitzer commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352058762

   Parquet files have non-data metadata which is not scanned when we read the 
split. So if for example our first row-group starts at byte 1000, we don't want 
to count 1000 bytes of the total file size as part of the file size 
representing data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain commented on pull request #6350: Query changelog table with a timestamp range

2022-12-14 Thread GitBox


flyrain commented on PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#issuecomment-1352094105

   Thanks @szehon-ho for the review. I believe you are talking about the case 2 
in https://github.com/apache/iceberg/pull/6350#discussion_r1044906141. I did 
try to return an empty set, but it seems a bit involved to make the change. 
Solutions would be
   1. Add a new interface to interface `IncrementalScan` to return an empty set.
   2. Reuse `IncrementalScan::fromSnapshotExclusive` by passing the 
latest/current snapshot id. It is a workaround without interface change. I 
tried this, which needs to touch a bunch of code.
   
   Let me know if there are easier way for that. Otherwise, I will make the 
change in another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352159451

   Thank you @RussellSpitzer ..I will close this.. may be issue I m seeing is 
conversion of double to long for fractional value. Will update once I debug 
more.
   Sorry for false alarm.. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid closed issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid closed issue #6424: The size estimation formula for spark task is 
incorrect
URL: https://github.com/apache/iceberg/issues/6424


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ddrinka commented on issue #2768: Support fan-out reads in PyIceberg

2022-12-14 Thread GitBox


ddrinka commented on issue #2768:
URL: https://github.com/apache/iceberg/issues/2768#issuecomment-1352288435

   > We could do this with Spark or Dask or Ray depending on what's installed 
on the system.
   
   Perhaps consider [Modin](https://github.com/modin-project/modin) as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on pull request #6426: Flink: add fixed field type for DataGenerators test util

2022-12-14 Thread GitBox


stevenzwu commented on PR #6426:
URL: https://github.com/apache/iceberg/pull/6426#issuecomment-1352289861

   @pvary regarding your other comments, I am not sure how to proceed yet.
   
   > Null values
   
for null values, would optional primitive types at top level be enough? 
   
   > Edge cases, like year differences based on TZ, or non valid timestamps 
based on TZ
   
   not very clear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] yegangy0718 commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2022-12-14 Thread GitBox


yegangy0718 commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1048051789


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestShuffleOperator {
+  private ShuffleOperator operator;
+
+  private Environment getTestingEnvironment() {
+return new StreamMockEnvironment(
+new Configuration(),
+new Configuration(),
+new ExecutionConfig(),
+1L,
+new MockInputSplitProvider(),
+1,
+new TestTaskStateManager());
+  }
+
+  @Before
+  public void before() throws Exception {
+MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
+KeySelector keySelector =
+new KeySelector() {
+  private static final long serialVersionUID = 7662520075515707428L;
+
+  @Override
+  public String getKey(String value) {
+return value;
+  }
+};
+
+this.operator =
+new ShuffleOperator<>(keySelector, TypeInformation.of(String.class), 
mockGateway);
+Environment env = getTestingEnvironment();
+this.operator.setup(
+new OneInputStreamTask(env),
+new MockStreamConfig(new Configuration(), 1),
+new MockOutput<>(Lists.newArrayList()));
+  }
+
+  @After
+  public void clean() throws Exception {
+operator.close();
+  }
+
+  @Test
+  public void testInitializeState() throws Exception {
+StateInitializationContext stateContext = getStateContext();
+operator.initializeState(stateContext);
+
+assertNotNull(
+stateContext
+.getOperatorStateStore()
+
.getListState(operator.generateGlobalDataDistributionWeightDescriptor()));
+  }
+
+  @Test
+  public void testProcessElement() throws Exception {
+StateInitializationContext stateContext = getStateContext();
+operator.initializeState(stateContext);
+operator.processElement(new StreamRecord<>("a"));
+operator.processElement(new StreamRecord<>("a"));
+operator.processElement(new StreamRecord<>("b"));
+assertTrue(operator.localDataStatisticsMap().containsKey("a"));
+assertTrue(operator.localDataStatisticsMap().containsKey("b"));
+assertEquals(2L, (long) operator.localDataStatisticsMap().get("a"));
+assertEquals(1L, (long) operator.localDataStatisticsMap().get("b"));
+  }
+
+  //  helper metho

[GitHub] [iceberg] flyrain merged pull request #6350: Spark 3.3: Time range query of changelog tables

2022-12-14 Thread GitBox


flyrain merged PR #6350:
URL: https://github.com/apache/iceberg/pull/6350


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #5071: Support UNSET of sortOrder from the SQL

2022-12-14 Thread GitBox


github-actions[bot] commented on issue #5071:
URL: https://github.com/apache/iceberg/issues/5071#issuecomment-1352388065

   This issue has been automatically marked as stale because it has been open 
for 180 days with no activity. It will be closed in next 14 days if no further 
activity occurs. To permanently prevent this issue from being considered stale, 
add the label 'not-stale', but commenting on the issue is preferred when 
possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #4948: Create a Github Action to automatically mark issues as stale and later close if inactive

2022-12-14 Thread GitBox


github-actions[bot] commented on issue #4948:
URL: https://github.com/apache/iceberg/issues/4948#issuecomment-1352388123

   This issue has been closed because it has not received any activity in the 
last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] closed issue #4948: Create a Github Action to automatically mark issues as stale and later close if inactive

2022-12-14 Thread GitBox


github-actions[bot] closed issue #4948: Create a Github Action to automatically 
mark issues as stale and later close if inactive
URL: https://github.com/apache/iceberg/issues/4948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dennishuo opened a new pull request, #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables

2022-12-14 Thread GitBox


dennishuo opened a new pull request, #6428:
URL: https://github.com/apache/iceberg/pull/6428

   This read-only implementation of the Catalog interface, initially built on 
top of the [Snowflake JDBC 
driver](https://docs.snowflake.com/en/user-guide/jdbc.html) for the connection 
layer, enables engines like Spark using the Iceberg Java SDK to be able to 
consume [Snowflake-managed Iceberg 
Tables](https://www.snowflake.com/blog/iceberg-tables-powering-open-standards-with-snowflake-innovations/)
 via Iceberg Catalog interfaces.
   
   Example, assuming a Snowflake account with a database `iot_data` containing 
a schema `public` and a managed Iceberg table `sensor_test_results`:
   
   spark-shell --conf 
spark.sql.catalog.snowlog=org.apache.iceberg.spark.SparkCatalog \
   --conf 
spark.sql.catalog.snowlog.catalog-impl=org.apache.iceberg.snowflake.SnowflakeCatalog
 \
   --conf 
spark.sql.catalog.snowlog.uri="jdbc:snowflake://$ACCOUNT.snowflakecomputing.com"
 \
   
   scala> spark.sessionState.catalogManager.setCurrentCatalog("snowlog");
   scala> spark.sql("show namespaces in iot_data").show(false);
   scala> spark.sql("select * from iot_data.public.sensor_test_results 
limit 10").show(false);
   
   Note that the involvement of a JDBC driver is only incidental, and 
functionality is different from the `JdbcCatalog` - here, Snowflake itself 
manages manifest/metadata files and table/snapshot metadata, and this catalog 
layer facilitates the coordination of metadata-file locations and discovery of 
the latest table snapshot versions without resorting to file-listing or 
"directory-name"-listing (for listTables or listNamespaces) like the 
HadoopCatalog.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352519479

   @RussellSpitzer . actually part of the issue what I was seeing was related 
to scannedFraction approximately equal to 1, but record count of 1., which was 
resulting in net rows seen = 0. due to double casted to long.
   And that was happening because splitOffset value was not being subtracted 
from fileSize.  as a result scannedFraction was .98... * 1   , which resulted 
in 0 rows instead of 1. 
   By putting splitOffset , the ratio is now exactly equal to 1. 
   Thanks for the information..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect

2022-12-14 Thread GitBox


ahshahid commented on issue #6424:
URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352520299

   The other issue which I was looking at was comparing perf of parquet with 
iceberg and in that it seems, that iceberg because of better size estimation as 
compared to parquet,  results in joins which tend to be less performant than 
parquet. But this is an issue of spark side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] xwmr-max commented on pull request #6412: Doc: Modify some options refer to Read-options in flink streaming rea…

2022-12-14 Thread GitBox


xwmr-max commented on PR #6412:
URL: https://github.com/apache/iceberg/pull/6412#issuecomment-1352605298

   @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] pvary commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2022-12-14 Thread GitBox


pvary commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1049262265


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+@Internal
+public class ShuffleOperator
+extends AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  // the type of the key to collect data statistics
+  private final TypeInformation keyType;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record
+  // value is the times key occurs
+  private transient Map localDataStatisticsMap;
+  private transient Map globalDataDistributionWeightMap;
+  private transient ListState> globalDataDistributionWeightState;
+
+  public ShuffleOperator(
+  KeySelector keySelector,
+  TypeInformation keyType,
+  OperatorEventGateway operatorEventGateway) {
+this.keySelector = keySelector;
+this.keyType = keyType;
+this.operatorEventGateway = operatorEventGateway;
+  }
+
+  @VisibleForTesting
+  ListStateDescriptor> 
generateGlobalDataDistributionWeightDescriptor() {
+return new ListStateDescriptor<>(
+"globalDataDistributionWeight", new MapTypeInfo<>(keyType, 
TypeInformation.of(Long.class)));
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+localDataStatisticsMap = Maps.newHashMap();
+globalDataDistributionWeightState =

Review Comment:
   Let's merge this change, and we can play around the Broadcaststate in a 
followup PR. WTYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jiamin13579 commented on pull request #6419: Doc:Example of correcting the document add/drop partition truncate

2022-12-14 Thread GitBox


jiamin13579 commented on PR #6419:
URL: https://github.com/apache/iceberg/pull/6419#issuecomment-1352635716

   > Not sure its necessary, looks like for now width can be any of the 
arguments: 
https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java#L428
   
   I think the document will mislead users,Many partitions will be generated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on a diff in pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-14 Thread GitBox


Fokko commented on code in PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#discussion_r1049285936


##
python/Makefile:
##
@@ -26,14 +26,21 @@ lint:
poetry run pre-commit run --all-files
 
 test:
-   poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not 
s3" ${PYTEST_ARGS}
+   poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3 
and not adlfs" ${PYTEST_ARGS}

Review Comment:
   I've been looking at this in https://github.com/apache/iceberg/pull/6398/ as 
well. I've added:
   
   ```python
   def pytest_collection_modifyitems(items, config):
   for item in items:
   if not any(item.iter_markers()):
   item.add_marker("unmarked")
   ```
   
   Which will automatically add the mark `unmarked` to each of the tests that 
doesn't have a marked, creating their own group. WDYT of that?



##
python/tests/conftest.py:
##
@@ -76,13 +76,30 @@
 
 
 def pytest_addoption(parser: pytest.Parser) -> None:
+# S3 options
 parser.addoption(
 "--s3.endpoint", action="store", default="http://localhost:9000";, 
help="The S3 endpoint URL for tests marked as s3"
 )
 parser.addoption("--s3.access-key-id", action="store", default="admin", 
help="The AWS access key ID for tests marked as s3")
 parser.addoption(
 "--s3.secret-access-key", action="store", default="password", 
help="The AWS secret access key ID for tests marked as s3"
 )
+# ADLFS options
+parser.addoption(
+"--adlfs.endpoint",
+action="store",
+default="http://127.0.0.1:1";,
+help="The ADLS endpoint URL for tests marked as adlfs",
+)
+parser.addoption(
+"--adlfs.account-name", action="store", default="devstoreaccount1", 
help="The ADLS account key for tests marked as adlfs"
+)
+parser.addoption(
+"--adlfs.account-key",
+action="store",
+
default="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",

Review Comment:
   Maybe we should add this Github comment as a code comment, in case we start 
doing credential scanning one day.



##
python/pyiceberg/io/fsspec.py:
##
@@ -143,8 +153,15 @@ def open(self) -> InputStream:
 
 Returns:
 OpenFile: An fsspec compliant file-like object
+
+Raises:
+FileNotFoundError: If the file does not exist
 """
-return self._fs.open(self.location, "rb")
+try:
+return self._fs.open(self.location, "rb")
+except FileNotFoundError as e:
+# To have a consistent error handling experience, make sure 
exception contains missing file location.

Review Comment:
   This is perfect! We don't want to return implementation-specific errors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables

2022-12-14 Thread GitBox


nastra commented on code in PR #6428:
URL: https://github.com/apache/iceberg/pull/6428#discussion_r1049312756


##
snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java:
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnowflakeCatalogTest {

Review Comment:
   just curious, would it be possible to extend the existing `CatalogTests` 
class as it already defines/tests a lot of common behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org