[GitHub] [iceberg] nastra commented on issue #6415: Vectorized Read Issue
nastra commented on issue #6415: URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1350603108 See https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java#L149-L210 for some background why the issue currently exists. I have opened https://github.com/apache/iceberg/pull/3024 a while ago, but it didn't get attention -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on issue #6415: Vectorized Read Issue
nastra commented on issue #6415: URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1350604548 @rdblue thoughts on getting the above issue fixed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra opened a new issue, #6423: Run JMH Benchmarks weekly / Visualize benchmark results
nastra opened a new issue, #6423: URL: https://github.com/apache/iceberg/issues/6423 ### Feature Request / Improvement Currently we have a way to run JMH benchmarks on forks. The goal here is that JMH Benchmarks are executed on a weekly (or any other cadence) via a GitHub action. Additionaly, it would be great if we could visualize the results and immediately see if there's a x % deviation from previous benchmark runs. I've found https://github.com/jzillmann/gradle-jmh-report which might help with that part ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid opened a new issue, #6424: The size estimation formula for spark task is incorrect
ahshahid opened a new issue, #6424: URL: https://github.com/apache/iceberg/issues/6424 ### Apache Iceberg version main (development) ### Query engine Spark ### Please describe the bug 🐞 The size estimation formula used for non partition cols as seen in ContentScanTask is presently as default long estimatedRowsCount() { double scannedFileFraction = ((double) length()) / file().fileSizeInBytes(); return (long) (scannedFileFraction * file().recordCount()); } IMO it should be (file().fileSizeInBytes() * file().recordCount()) / length() We are estimating the full row count by scanning part of the file, and the rows contained in it. the current formula is wroing , because scannedFileFraction is bound to be <= 1 so full row count has to be >= file().recordCount() but if full row count = scannedFileFraction * file().recordCount() implies that full row count <= file().recordCount() which is incorrect. I have bugtest which shows that because of this, inefficient broadcast hashjoins are getting created. Will create a PR & bug test tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on a diff in pull request #3337: Fixed issue #3336: Best efforts to release hive table lock
pvary commented on code in PR #3337: URL: https://github.com/apache/iceberg/pull/3337#discussion_r1048287741 ## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java: ## @@ -499,11 +499,21 @@ private void unlock(Optional lockId) { } @VisibleForTesting - void doUnlock(long lockId) throws TException, InterruptedException { -metaClients.run(client -> { - client.unlock(lockId); - return null; -}); + void doUnlock(long lockId) throws TException { +boolean released = false; +while (!released) { + try { +released = metaClients.run(client -> { Review Comment: @dmgcodevil: I have missed that. Thanks. @fengguangyuan: Still a bit concerned about the possible inifinite loop. Could we just do a single retry instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nazq commented on issue #6415: Vectorized Read Issue
nazq commented on issue #6415: URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351547529 Happy to create a PR @rdblue , just want to make sure we're on the right track here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rubenvdg commented on issue #6361: Python: Ignore home folder when running tests
rubenvdg commented on issue #6361: URL: https://github.com/apache/iceberg/issues/6361#issuecomment-1351585780 Happy to take this one on, if nobody is working on it atm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect
RussellSpitzer commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351621767 The current code is slightly different than this, https://github.com/apache/iceberg/blob/33217abf7f88c6c22a8c43b320f9de48de998b94/api/src/main/java/org/apache/iceberg/ContentScanTask.java#L65-L70 but i'm not sure I follow your math. Why would Full Row Count have to be greater than the record count for a split? If we scan only part of a file it should only be a portion of the rows in the file? When we sum over all of our partial scans we should get the full amount (if the full file is read) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on issue #6415: Vectorized Read Issue
nastra commented on issue #6415: URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351641875 @nazq just FYI, there's already #3024 that addresses this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nazq commented on issue #6415: Vectorized Read Issue
nazq commented on issue #6415: URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351656081 Excellent. Thanks for the pointer @nastra -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on issue #6420: Iceberg Materialized View Spec
nastra commented on issue #6420: URL: https://github.com/apache/iceberg/issues/6420#issuecomment-1351656857 @JanKaul I think it would be great to get this out to the DEV mailing list to get more attention and input from people -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351671855 @RussellSpitzer Right, I missed the modifiucation of " - splitOffset". Though the bug, which I think is in formula, still remains. My reasoning is as follows: the function estimatedRowCounts has to estimate the total row count of a split/file (or a single file) by analyzing a fraction of split (file) . which means that total row count of a split/file >= scanned fraction row count( which is what we call record count) now if total row count of a split/file = (scannedFileFraction * file().recordCount()) and scanned fraction is <= 1 this would result in total row count <= fraction's record count. the change i proposed is based on this ratio/proportion when scanned file/split size is length() rows is file().recordCount() so when total size of file/split is (file().fileSizeInBytes() - splitOffset).the total count X = ? X = ( file().recordCount() * (file().fileSizeInBytes() - splitOffset) ) / length() do u think my understanding is correct , of the objective of the function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu merged pull request #6313: Flink: use correct metric config for position deletes
stevenzwu merged PR #6313: URL: https://github.com/apache/iceberg/pull/6313 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6313: Flink: use correct metric config for position deletes
stevenzwu commented on PR #6313: URL: https://github.com/apache/iceberg/pull/6313#issuecomment-1351737859 thanks @chenjunjiedada for the contribution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect
RussellSpitzer commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351768402 > now if total row count of a split/file = (scannedFileFraction * file().recordCount()) This is I think the confusion, we are attempting to determine how many rows are in this split specifically because we are summing over all splits. https://github.com/apache/iceberg/blob/7fd9ded0a119c050746d765bd90c59fef93506b1/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java#L143 A file may be turned into multiple FileScanTasks so we can only count the number of rows contributed by the tasks we are currently looking at. Let's say we do a scan which only ends up touching FileA but File A is divided into multiple FileScanTasks (or now Content Scan Tasks) for read parallelism A1, A2, and A3 If they have lengths 2/5 A , 2/5 A and 1/5 A then we do the following math Sum over all tasks (2/5 A * Total A Rows + 2/5 A * total A Rows + 1/5 A * total A Rows) = Total A Rows This should be the total amount of rows in the scan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6378: Spark: Extend Timeout During Partial Progress Rewrites
RussellSpitzer commented on code in PR #6378: URL: https://github.com/apache/iceberg/pull/6378#discussion_r1048728860 ## core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java: ## @@ -225,25 +225,40 @@ public void close() { LOG.info("Closing commit service for {}", table); committerService.shutdown(); + boolean timeout = false; + int waitTime; try { // All rewrites have completed and all new files have been created, we are now waiting for // the commit -// pool to finish doing it's commits to Iceberg State. In the case of partial progress this +// pool to finish doing its commits to Iceberg State. In the case of partial progress this // should // have been occurring simultaneously with rewrites, if not there should be only a single // commit operation. -// In either case this should take much less than 10 minutes to actually complete. -if (!committerService.awaitTermination(10, TimeUnit.MINUTES)) { +// We will wait 10 minutes plus 5 more minutes for each commit left to perform due to the +// time required for writing manifests +waitTime = 10 + (completedRewrites.size() / rewritesPerCommit) * 5; Review Comment: I was just thinking that 5 minutes is plenty of time to write a manifest file. In our use case the biggest problem is the amount of time it takes to write the manifests. Rewriting the Json portion is fast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range
RussellSpitzer commented on code in PR #6350: URL: https://github.com/apache/iceberg/pull/6350#discussion_r1048731625 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java: ## @@ -308,6 +339,17 @@ public Scan buildChangelogScan() { return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions); } + private Long getStartSnapshotId(Long startTimestamp) { +Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp); +Preconditions.checkArgument( Review Comment: I think you are correct @flyrain , I only really would want the error for 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range
RussellSpitzer commented on code in PR #6350: URL: https://github.com/apache/iceberg/pull/6350#discussion_r1048750874 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java: ## @@ -137,6 +138,64 @@ public void testOverwrites() { changelogRecords(snap2, snap3)); } + @Test + public void testQueryWithTimeRange() { +sql( +"CREATE TABLE %s (id INT, data STRING) " ++ "USING iceberg " ++ "PARTITIONED BY (data) " ++ "TBLPROPERTIES ( " ++ " '%s' = '%d' " ++ ")", +tableName, FORMAT_VERSION, formatVersion); + +sql("INSERT INTO %s VALUES (1, 'a')", tableName); +sql("INSERT INTO %s VALUES (2, 'b')", tableName); + +Table table = validationCatalog.loadTable(tableIdent); + +Snapshot snap2 = table.currentSnapshot(); + +sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName); + +table.refresh(); + +Snapshot snap3 = table.currentSnapshot(); + +assertEquals( +"Should have expected changed rows only from snapshot 3", +ImmutableList.of( Review Comment: Not a big deal if we aren't using this a lot. If we end up making more tests like this in the future though I think it would be a good utility -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
RussellSpitzer commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r104878 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; Review Comment: This seems like a very expensive way to pass back 2 nulls -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
RussellSpitzer commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048780674 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; Review Comment: I would probably just return "null" and do an "if null { currentRow = null , cachedRow = null } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional comman
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
RussellSpitzer commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048789305 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; +} else { + deletedRow.update(changeTypeIndex, UPDATE_BEFORE); + insertedRow.update(changeTypeIndex, UPDATE_AFTER); + + return new Row[] { +RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values()) Review Comment: Why are we making new rows here? Can we not just use our GenericInternalRows here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queri
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
RussellSpitzer commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048790825 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; +} else { + deletedRow.update(changeTypeIndex, UPDATE_BEFORE); + insertedRow.update(changeTypeIndex, UPDATE_AFTER); + + return new Row[] { +RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values()) + }; +} + } + + private boolean isCarryoverRecord(GenericInternalRow deletedRow, GenericInternalRow insertedRow) { +// set the change_type to the same value +deletedRow.update(changeTypeIndex, ""); Review Comment: I would just define an equals method that ignores ChangeTypeIndex -- This is an automated message from t
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
RussellSpitzer commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048792877 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; +} else { + deletedRow.update(changeTypeIndex, UPDATE_BEFORE); + insertedRow.update(changeTypeIndex, UPDATE_AFTER); + + return new Row[] { +RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values()) + }; +} + } + + private boolean isCarryoverRecord(GenericInternalRow deletedRow, GenericInternalRow insertedRow) { +// set the change_type to the same value +deletedRow.update(changeTypeIndex, ""); +insertedRow.update(changeTypeIndex, ""); +return deletedRow.equals(insertedRow); + } + + private boolean updated(Row
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
RussellSpitzer commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048795379 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + private final boolean markUpdatedRows; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, + int changeTypeIndex, + List partitionIdx, + boolean markUpdatedRows) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; +this.markUpdatedRows = markUpdatedRows; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is a processed cached row, return it directly +if (cachedRow != null +&& !cachedRow.getString(changeTypeIndex).equals(DELETE) +&& !cachedRow.getString(changeTypeIndex).equals(INSERT)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + + if (withinPartition(currentRow, nextRow) + && currentRow.getString(changeTypeIndex).equals(DELETE) + && nextRow.getString(changeTypeIndex).equals(INSERT)) { + +GenericInternalRow deletedRow = +new GenericInternalRow(((GenericRowWithSchema) currentRow).values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +// set the change_type to the same value +deletedRow.update(changeTypeIndex, ""); +insertedRow.update(changeTypeIndex, ""); + +if (deletedRow.equals(insertedRow)) { + // remove two carry-over rows + currentRow = null; + this.cachedRow = null; +} else if (markUpdatedRows) { + // mark the updated rows + deletedRow.update(changeTypeIndex, ChangelogOperation.UPDATE_BEFORE.name()); + currentRow = RowFactory.create(deletedRow.values()); + + insertedRow.update(changeTypeIndex, ChangelogOperation.UPDATE_AFTER.name()); + this.cachedRow = RowFactory.create(insertedRow.values()); +} else { + // recover the values of change type + deletedRow.update(changeTypeIndex, DELETE); + insertedRow.update(changeTypeIndex, INSERT); + this.cachedRow = nextRow; +} + + } else { +this.cachedRow = nextRow; + } +} + +return currentRow; + } + + private Row currentRow() { +if (cachedRow != null) { + Row row = cachedRow; + cachedRow = null; + return row; +} else { + return rowIterator.next(); +} + } + + private boolean withinPartition(Row currentRow, Row nextRow) { Review Comment: I think something like "sameLogicalRow(Row currentRow, Row nextRow)" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail:
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351885991 Right... That I agree. May be along with split offset ( which is the start of split) , we need the end of split.. But still, pls allow me to describe this simplified case , where the split is same as the file being considered, so that split offset is 0. and assume split size = file.size. Now as per current formula splitOffset = 0 so double scannedFileFraction = ((double) **length()**) / (file().fileSizeInBytes() ); here ** length() ** is the amount of bytes scanned ( only partially read) and ** file().recordCount() ** is the number of records read in that partial scan... right ? now scannedFileFraction is < 1 ( assume that total file is not scanned) . say scannedFileFraction = 0.5 and numRowsInScan = recordCount = 50 So ideally total estimated row count in that file should be 100. But with the formula we will get total row count = 0.5 * 50 = 25 But with corrected formula it will be = 50 / 0.5 = 100 because in corrected formula , the fraction is = (file().fileSizeInBytes()) / length = 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6405: API: Add Aggregate expression evaluation
rdblue commented on code in PR #6405: URL: https://github.com/apache/iceberg/pull/6405#discussion_r1048806645 ## api/src/main/java/org/apache/iceberg/expressions/CountStar.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.StructLike; + +public class CountStar extends CountAggregate { + protected CountStar(BoundTerm term) { +super(Operation.COUNT_STAR, term); + } + + @Override + protected Long countFor(StructLike row) { +return 1L; + } + + @Override + protected Long countFor(DataFile file) { +long count = file.recordCount(); +if (count < 0) { + return null; +} Review Comment: Some imported Avro files had incorrect metadata several versions ago. I don't think it is widespread, but it is good to handle it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect
RussellSpitzer commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351901052 > here length() is the amount of bytes scanned ( only partially read) https://github.com/apache/iceberg/blob/33217abf7f88c6c22a8c43b320f9de48de998b94/api/src/main/java/org/apache/iceberg/ContentScanTask.java#L48-L53 Your hypothetical has a contradiction `length = splitSize` So `if (splitSize == fileSizeInBytes) then length == fileSizeInBytes` `If length == fileSizeInBytes/2 then splitSize = fileSizeInBytes/2` Therefor there must be one other task of length `fileSizeInBytes/2` otherwise we wouldn't be reading the whole file. So again the fraction here is correct at 50% for the given task. The other Task will count the other 50% -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
RussellSpitzer commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1048813554 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.apache.iceberg.ChangelogOperation.DELETE; +import static org.apache.iceberg.ChangelogOperation.INSERT; +import static org.apache.iceberg.ChangelogOperation.UPDATE_AFTER; +import static org.apache.iceberg.ChangelogOperation.UPDATE_BEFORE; + +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.junit.Test; + +public class TestChangelogIterator { + private final List rows = + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {1, "a", "data", "DELETE"}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", "INSERT"}, null), + // next two rows belong to different partitions + new GenericRowWithSchema(new Object[] {2, "b", "data", "DELETE"}, null), + new GenericRowWithSchema(new Object[] {3, "c", "data", "INSERT"}, null), + new GenericRowWithSchema(new Object[] {4, "d", "data", "DELETE"}, null), + new GenericRowWithSchema(new Object[] {4, "d", "data", "INSERT"}, null)); + + private final int changeTypeIndex = 3; + private final List partitionIdx = Lists.newArrayList(0, 1); + + @Test + public void testUpdatedRows() { Review Comment: I think we need more exhaustive tests here. I would probably permute our 4 types of records here. Update - CarryOver - Insert - Delete We should make sure this works in any permutation of ordering. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on pull request #6350: Query changelog table with a timestamp range
flyrain commented on PR #6350: URL: https://github.com/apache/iceberg/pull/6350#issuecomment-1351909259 Thanks @RussellSpitzer. Hi @szehon-ho @hililiwei , please let me know if you have any comments. Thanks1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dmgcodevil commented on a diff in pull request #3337: Fixed issue #3336: Best efforts to release hive table lock
dmgcodevil commented on code in PR #3337: URL: https://github.com/apache/iceberg/pull/3337#discussion_r1048815939 ## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java: ## @@ -499,11 +499,21 @@ private void unlock(Optional lockId) { } @VisibleForTesting - void doUnlock(long lockId) throws TException, InterruptedException { -metaClients.run(client -> { - client.unlock(lockId); - return null; -}); + void doUnlock(long lockId) throws TException { +boolean released = false; +while (!released) { + try { +released = metaClients.run(client -> { Review Comment: there is no retry. also: ``` catch (RuntimeException e) { throw e; } ``` is redundant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351924900 Right.. I was also thinking that this is where I have a misunderstanding or bug... The question is : where the recordCount represents the scanned fraction row count, or the total row count of the split/file. I will look into the code, but as per me., the recordCount has to be partial scanned count. This is for 2 reasons: 1) The estimated Row count function needs to return the total row count in the split/file. If that was available , then calculations should not even be needed. 2) For a split which is partial on a single file, or if it spawns multiple files, the estimated row count of total split will need to be calculated. and for that the recordCount should be something which refers to partial scanned row count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect
RussellSpitzer commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351938390 Record count does not represent the scanned fraction. I linked you to the code, it's a representation of a row in a manifestFile which is a the metadata for the entire file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1351939955 I see. let me see if I can explain what I mean by test... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] asheeshgarg commented on issue #6415: Vectorized Read Issue
asheeshgarg commented on issue #6415: URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1351998029 @nastra thanks for the references seems to be the case. @rdblue this seems to be really the case with lot of datasets. Do we have any time line when https://github.com/apache/iceberg/pull/3024 will be merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352025232 @RussellSpitzer : I see what you are saying about record count corresponding to total file size. Let me look into what is causing something wrong in my test for join -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352040671 @RussellSpitzer : apologies for bugging , I was hoping one more clarification on this aspect: long splitOffset = (file().splitOffsets() != null) ? file().splitOffsets().get(0) : 0L; double scannedFileFraction = ((double) length()) / (file().fileSizeInBytes() - splitOffset); return (long) (scannedFileFraction * file().recordCount()); Given that file.recordCount() corresponds to file().fileSizeInBytes() and length() is the length of the split ( if my understanding is correct) then the subtraction of splitOffset is not needed as per my understanding... why is splitOffset being subtracted from fileSize... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #6424: The size estimation formula for spark task is incorrect
RussellSpitzer commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352058762 Parquet files have non-data metadata which is not scanned when we read the split. So if for example our first row-group starts at byte 1000, we don't want to count 1000 bytes of the total file size as part of the file size representing data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on pull request #6350: Query changelog table with a timestamp range
flyrain commented on PR #6350: URL: https://github.com/apache/iceberg/pull/6350#issuecomment-1352094105 Thanks @szehon-ho for the review. I believe you are talking about the case 2 in https://github.com/apache/iceberg/pull/6350#discussion_r1044906141. I did try to return an empty set, but it seems a bit involved to make the change. Solutions would be 1. Add a new interface to interface `IncrementalScan` to return an empty set. 2. Reuse `IncrementalScan::fromSnapshotExclusive` by passing the latest/current snapshot id. It is a workaround without interface change. I tried this, which needs to touch a bunch of code. Let me know if there are easier way for that. Otherwise, I will make the change in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352159451 Thank you @RussellSpitzer ..I will close this.. may be issue I m seeing is conversion of double to long for fractional value. Will update once I debug more. Sorry for false alarm.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid closed issue #6424: The size estimation formula for spark task is incorrect
ahshahid closed issue #6424: The size estimation formula for spark task is incorrect URL: https://github.com/apache/iceberg/issues/6424 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ddrinka commented on issue #2768: Support fan-out reads in PyIceberg
ddrinka commented on issue #2768: URL: https://github.com/apache/iceberg/issues/2768#issuecomment-1352288435 > We could do this with Spark or Dask or Ray depending on what's installed on the system. Perhaps consider [Modin](https://github.com/modin-project/modin) as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6426: Flink: add fixed field type for DataGenerators test util
stevenzwu commented on PR #6426: URL: https://github.com/apache/iceberg/pull/6426#issuecomment-1352289861 @pvary regarding your other comments, I am not sure how to proceed yet. > Null values for null values, would optional primitive types at top level be enough? > Edge cases, like year differences based on TZ, or non valid timestamps based on TZ not very clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] yegangy0718 commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
yegangy0718 commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1048051789 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestShuffleOperator { + private ShuffleOperator operator; + + private Environment getTestingEnvironment() { +return new StreamMockEnvironment( +new Configuration(), +new Configuration(), +new ExecutionConfig(), +1L, +new MockInputSplitProvider(), +1, +new TestTaskStateManager()); + } + + @Before + public void before() throws Exception { +MockOperatorEventGateway mockGateway = new MockOperatorEventGateway(); +KeySelector keySelector = +new KeySelector() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public String getKey(String value) { +return value; + } +}; + +this.operator = +new ShuffleOperator<>(keySelector, TypeInformation.of(String.class), mockGateway); +Environment env = getTestingEnvironment(); +this.operator.setup( +new OneInputStreamTask(env), +new MockStreamConfig(new Configuration(), 1), +new MockOutput<>(Lists.newArrayList())); + } + + @After + public void clean() throws Exception { +operator.close(); + } + + @Test + public void testInitializeState() throws Exception { +StateInitializationContext stateContext = getStateContext(); +operator.initializeState(stateContext); + +assertNotNull( +stateContext +.getOperatorStateStore() + .getListState(operator.generateGlobalDataDistributionWeightDescriptor())); + } + + @Test + public void testProcessElement() throws Exception { +StateInitializationContext stateContext = getStateContext(); +operator.initializeState(stateContext); +operator.processElement(new StreamRecord<>("a")); +operator.processElement(new StreamRecord<>("a")); +operator.processElement(new StreamRecord<>("b")); +assertTrue(operator.localDataStatisticsMap().containsKey("a")); +assertTrue(operator.localDataStatisticsMap().containsKey("b")); +assertEquals(2L, (long) operator.localDataStatisticsMap().get("a")); +assertEquals(1L, (long) operator.localDataStatisticsMap().get("b")); + } + + // helper metho
[GitHub] [iceberg] flyrain merged pull request #6350: Spark 3.3: Time range query of changelog tables
flyrain merged PR #6350: URL: https://github.com/apache/iceberg/pull/6350 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #5071: Support UNSET of sortOrder from the SQL
github-actions[bot] commented on issue #5071: URL: https://github.com/apache/iceberg/issues/5071#issuecomment-1352388065 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #4948: Create a Github Action to automatically mark issues as stale and later close if inactive
github-actions[bot] commented on issue #4948: URL: https://github.com/apache/iceberg/issues/4948#issuecomment-1352388123 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] closed issue #4948: Create a Github Action to automatically mark issues as stale and later close if inactive
github-actions[bot] closed issue #4948: Create a Github Action to automatically mark issues as stale and later close if inactive URL: https://github.com/apache/iceberg/issues/4948 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dennishuo opened a new pull request, #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables
dennishuo opened a new pull request, #6428: URL: https://github.com/apache/iceberg/pull/6428 This read-only implementation of the Catalog interface, initially built on top of the [Snowflake JDBC driver](https://docs.snowflake.com/en/user-guide/jdbc.html) for the connection layer, enables engines like Spark using the Iceberg Java SDK to be able to consume [Snowflake-managed Iceberg Tables](https://www.snowflake.com/blog/iceberg-tables-powering-open-standards-with-snowflake-innovations/) via Iceberg Catalog interfaces. Example, assuming a Snowflake account with a database `iot_data` containing a schema `public` and a managed Iceberg table `sensor_test_results`: spark-shell --conf spark.sql.catalog.snowlog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.snowlog.catalog-impl=org.apache.iceberg.snowflake.SnowflakeCatalog \ --conf spark.sql.catalog.snowlog.uri="jdbc:snowflake://$ACCOUNT.snowflakecomputing.com" \ scala> spark.sessionState.catalogManager.setCurrentCatalog("snowlog"); scala> spark.sql("show namespaces in iot_data").show(false); scala> spark.sql("select * from iot_data.public.sensor_test_results limit 10").show(false); Note that the involvement of a JDBC driver is only incidental, and functionality is different from the `JdbcCatalog` - here, Snowflake itself manages manifest/metadata files and table/snapshot metadata, and this catalog layer facilitates the coordination of metadata-file locations and discovery of the latest table snapshot versions without resorting to file-listing or "directory-name"-listing (for listTables or listNamespaces) like the HadoopCatalog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352519479 @RussellSpitzer . actually part of the issue what I was seeing was related to scannedFraction approximately equal to 1, but record count of 1., which was resulting in net rows seen = 0. due to double casted to long. And that was happening because splitOffset value was not being subtracted from fileSize. as a result scannedFraction was .98... * 1 , which resulted in 0 rows instead of 1. By putting splitOffset , the ratio is now exactly equal to 1. Thanks for the information.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ahshahid commented on issue #6424: The size estimation formula for spark task is incorrect
ahshahid commented on issue #6424: URL: https://github.com/apache/iceberg/issues/6424#issuecomment-1352520299 The other issue which I was looking at was comparing perf of parquet with iceberg and in that it seems, that iceberg because of better size estimation as compared to parquet, results in joins which tend to be less performant than parquet. But this is an issue of spark side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] xwmr-max commented on pull request #6412: Doc: Modify some options refer to Read-options in flink streaming rea…
xwmr-max commented on PR #6412: URL: https://github.com/apache/iceberg/pull/6412#issuecomment-1352605298 @openinx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
pvary commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1049262265 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +@Internal +public class ShuffleOperator +extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + // the type of the key to collect data statistics + private final TypeInformation keyType; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + private transient Map localDataStatisticsMap; + private transient Map globalDataDistributionWeightMap; + private transient ListState> globalDataDistributionWeightState; + + public ShuffleOperator( + KeySelector keySelector, + TypeInformation keyType, + OperatorEventGateway operatorEventGateway) { +this.keySelector = keySelector; +this.keyType = keyType; +this.operatorEventGateway = operatorEventGateway; + } + + @VisibleForTesting + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { +return new ListStateDescriptor<>( +"globalDataDistributionWeight", new MapTypeInfo<>(keyType, TypeInformation.of(Long.class))); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { +localDataStatisticsMap = Maps.newHashMap(); +globalDataDistributionWeightState = Review Comment: Let's merge this change, and we can play around the Broadcaststate in a followup PR. WTYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jiamin13579 commented on pull request #6419: Doc:Example of correcting the document add/drop partition truncate
jiamin13579 commented on PR #6419: URL: https://github.com/apache/iceberg/pull/6419#issuecomment-1352635716 > Not sure its necessary, looks like for now width can be any of the arguments: https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java#L428 I think the document will mislead users,Many partitions will be generated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
Fokko commented on code in PR #6392: URL: https://github.com/apache/iceberg/pull/6392#discussion_r1049285936 ## python/Makefile: ## @@ -26,14 +26,21 @@ lint: poetry run pre-commit run --all-files test: - poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3" ${PYTEST_ARGS} + poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3 and not adlfs" ${PYTEST_ARGS} Review Comment: I've been looking at this in https://github.com/apache/iceberg/pull/6398/ as well. I've added: ```python def pytest_collection_modifyitems(items, config): for item in items: if not any(item.iter_markers()): item.add_marker("unmarked") ``` Which will automatically add the mark `unmarked` to each of the tests that doesn't have a marked, creating their own group. WDYT of that? ## python/tests/conftest.py: ## @@ -76,13 +76,30 @@ def pytest_addoption(parser: pytest.Parser) -> None: +# S3 options parser.addoption( "--s3.endpoint", action="store", default="http://localhost:9000";, help="The S3 endpoint URL for tests marked as s3" ) parser.addoption("--s3.access-key-id", action="store", default="admin", help="The AWS access key ID for tests marked as s3") parser.addoption( "--s3.secret-access-key", action="store", default="password", help="The AWS secret access key ID for tests marked as s3" ) +# ADLFS options +parser.addoption( +"--adlfs.endpoint", +action="store", +default="http://127.0.0.1:1";, +help="The ADLS endpoint URL for tests marked as adlfs", +) +parser.addoption( +"--adlfs.account-name", action="store", default="devstoreaccount1", help="The ADLS account key for tests marked as adlfs" +) +parser.addoption( +"--adlfs.account-key", +action="store", + default="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", Review Comment: Maybe we should add this Github comment as a code comment, in case we start doing credential scanning one day. ## python/pyiceberg/io/fsspec.py: ## @@ -143,8 +153,15 @@ def open(self) -> InputStream: Returns: OpenFile: An fsspec compliant file-like object + +Raises: +FileNotFoundError: If the file does not exist """ -return self._fs.open(self.location, "rb") +try: +return self._fs.open(self.location, "rb") +except FileNotFoundError as e: +# To have a consistent error handling experience, make sure exception contains missing file location. Review Comment: This is perfect! We don't want to return implementation-specific errors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables
nastra commented on code in PR #6428: URL: https://github.com/apache/iceberg/pull/6428#discussion_r1049312756 ## snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java: ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnowflakeCatalogTest { Review Comment: just curious, would it be possible to extend the existing `CatalogTests` class as it already defines/tests a lot of common behavior -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org