[GitHub] [iceberg] dramaticlly commented on a diff in pull request #6372: [Python] Fix incorrect description when set a property
dramaticlly commented on code in PR #6372: URL: https://github.com/apache/iceberg/pull/6372#discussion_r1041878402 ## python/pyiceberg/cli/console.py: ## @@ -286,7 +286,7 @@ def get_table(ctx: Context, identifier: str, property_name: str): @properties.group() def set(): -"""Removes properties on tables/namespaces""" +"""Sets a property on tables/namespaces""" Review Comment: thank you @singhpk234 , updated per your suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] 245831311 commented on issue #4550: the snapshot file is lost when write iceberg using flink Failed to open input stream for file File does not exist
245831311 commented on issue #4550: URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1340578855 > I have solved this problem. Thank you. My problem mainly occurs when InMemoryLockManager releases the heartbeat of the lock and reports a NullPointerException; I rewrote InMemoryLockManager to solve this problem. 在 2022年12月6日 ***@***.***> 写道: After deeping into iceberg code and the log, I can reproduce it in debugging locally. The scenario may happens in the process of Flink cancelling. IcebergFileCommitter is going to commit file. In the step of rename metadata.json(org.apache.iceberg.hadoop.HadoopTableOperations#renameToFinal), org.apache.hadoop.ipc.Client.call encounters InterruptedIOException. I suspect it comes from Flink task cancelling. On the other hand, Hdfs has renamed the metada.json file sucessfully. After rename fails, it's supposed to retry. But the thread encounters InterruptedException in sleeping(org.apache.iceberg.util.Tasks#runTaskWithRetry). Then it will throw a RuntimeException. And the version-hint will not be updated. The RuntimeException lea ds to rollback in org.apache.iceberg.BaseTransaction(#cleanUpOnCommitFailure), which will delete manifest list (snap-XXX). — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: ***@***.***> 能否更具体描述下吗?我被这个问题困扰了挺久 ,感谢 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rajarshisarkar opened a new pull request, #6374: Docs: Remove backticks from Spark procedure headings
rajarshisarkar opened a new pull request, #6374: URL: https://github.com/apache/iceberg/pull/6374 This PR removes backticks from the Spark procedure headings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] JMin824 closed issue #6373: Question about usage of RewriteFile with Zorder Strategy
JMin824 closed issue #6373: Question about usage of RewriteFile with Zorder Strategy URL: https://github.com/apache/iceberg/issues/6373 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] JMin824 commented on issue #6373: Question about usage of RewriteFile with Zorder Strategy
JMin824 commented on issue #6373: URL: https://github.com/apache/iceberg/issues/6373#issuecomment-1340607468 > Rewrite all rewrites all files, this means reading all data of the files, ordering them, then writing out new ordered files. If no predicates are selected this would completely rewrite the table. Thanks a lot, I understand after your reply -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ggershinsky commented on a diff in pull request #3231: GCM encryption stream
ggershinsky commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1041939078 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +public class AesGcmInputFile implements InputFile { + private final InputFile sourceFile; + private final byte[] dataKey; + private long plaintextLength; + + public AesGcmInputFile(InputFile sourceFile, byte[] dataKey) { +this.sourceFile = sourceFile; +this.dataKey = dataKey; +this.plaintextLength = -1; + } + + @Override + public long getLength() { +if (plaintextLength == -1) { + try { +this.newStream().close(); Review Comment: Per our recent discussion, we'll set the default block size to 1MB, so the plaintext length can be calculated without opening the stream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ggershinsky commented on a diff in pull request #3231: GCM encryption stream
ggershinsky commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1041943081 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; +int fetched = sourceStream.read(prefixBytes); +Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH, +"Insufficient read " + fetched + +". The stream length should be at least " + Ciphers.GCM_STREAM_PREFIX_LENGTH); + +byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length]; +System.arraycopy(prefixBytes, 0, magic, 0, Ciphers.GCM_STREAM_MAGIC_ARRAY.length); +Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, magic), +"Cannot open encrypted file, it does not begin with magic string " + Ciphers.GCM_STREAM_MAGIC_STRING); + +if (!emptyCipherStream) { + this.plainStreamPosition = 0; + this.fileAadPrefix = fileAadPrefix; + gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey); + plainBlockSize = ByteBuffer.wrap(prefixBytes, Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4) + .order(ByteOrder.LITTLE_ENDIAN).getInt(); + Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + plainBlockSize); + + cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + this.ciphertextBlockBuffer = new byte[cipherBlockSize]; + this.currentBlockIndex = 0; + this.currentOffsetInPlainBlock = 0; + + int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / cipherBlockSize); + int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - numberOfFullBlocks * cipherBlockSize); + boolean fullBlocksOnly = (0 == cipherBytesInLastBlock); + numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : numberOfFullBlocks + 1; + lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : cipherBytesInLastBlock; // never 0 + int plainBytesInLastBlock = fullBlocksOnly ? 0 : + (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - Ciphers.GCM_TAG_LENGTH); Review Comment: This is the last block, so its length will be variable and shorter than the block size constant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr..
[GitHub] [iceberg] ggershinsky commented on a diff in pull request #3231: GCM encryption stream
ggershinsky commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1041943937 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; Review Comment: SGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ggershinsky commented on a diff in pull request #3231: GCM encryption stream
ggershinsky commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1041945463 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); + +this.emptyCipherStream = (0 == netSourceFileSize); +this.sourceStream = sourceStream; +byte[] prefixBytes = new byte[Ciphers.GCM_STREAM_PREFIX_LENGTH]; +int fetched = sourceStream.read(prefixBytes); +Preconditions.checkState(fetched == Ciphers.GCM_STREAM_PREFIX_LENGTH, +"Insufficient read " + fetched + +". The stream length should be at least " + Ciphers.GCM_STREAM_PREFIX_LENGTH); + +byte[] magic = new byte[Ciphers.GCM_STREAM_MAGIC_ARRAY.length]; +System.arraycopy(prefixBytes, 0, magic, 0, Ciphers.GCM_STREAM_MAGIC_ARRAY.length); +Preconditions.checkState(Arrays.equals(Ciphers.GCM_STREAM_MAGIC_ARRAY, magic), +"Cannot open encrypted file, it does not begin with magic string " + Ciphers.GCM_STREAM_MAGIC_STRING); + +if (!emptyCipherStream) { + this.plainStreamPosition = 0; + this.fileAadPrefix = fileAadPrefix; + gcmDecryptor = new Ciphers.AesGcmDecryptor(aesKey); + plainBlockSize = ByteBuffer.wrap(prefixBytes, Ciphers.GCM_STREAM_MAGIC_ARRAY.length, 4) + .order(ByteOrder.LITTLE_ENDIAN).getInt(); + Preconditions.checkState(plainBlockSize > 0, "Wrong plainBlockSize " + plainBlockSize); + + cipherBlockSize = plainBlockSize + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + this.ciphertextBlockBuffer = new byte[cipherBlockSize]; + this.currentBlockIndex = 0; + this.currentOffsetInPlainBlock = 0; + + int numberOfFullBlocks = Math.toIntExact(netSourceFileSize / cipherBlockSize); + int cipherBytesInLastBlock = Math.toIntExact(netSourceFileSize - numberOfFullBlocks * cipherBlockSize); + boolean fullBlocksOnly = (0 == cipherBytesInLastBlock); + numberOfBlocks = fullBlocksOnly ? numberOfFullBlocks : numberOfFullBlocks + 1; + lastCipherBlockSize = fullBlocksOnly ? cipherBlockSize : cipherBytesInLastBlock; // never 0 + int plainBytesInLastBlock = fullBlocksOnly ? 0 : + (cipherBytesInLastBlock - Ciphers.NONCE_LENGTH - Ciphers.GCM_TAG_LENGTH); + plainStreamSize = numberOfFullBlocks * plainBlockSize + plainBytesInLastBlock; +} else { + plainStreamSize = 0; + + gcmDecryptor = null; + ciphertextBlockBuffer = null; + cipherBlockSize = -1; + plainBlockSize = -1; + numberOfBlocks = -1; + lastCipherBlockSize = -1; + this.fileAadPrefix = null; +} + } + + public long plaintextStreamSize() { +return plainStreamSize; + } + + @Override + public int available() throws IOException { +long maxAvailable = plainStreamSize - plainStreamPo
[GitHub] [iceberg] ggershinsky commented on a diff in pull request #3231: GCM encryption stream
ggershinsky commented on code in PR #3231: URL: https://github.com/apache/iceberg/pull/3231#discussion_r1041949962 ## core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AesGcmInputStream extends SeekableInputStream { + private final SeekableInputStream sourceStream; + private final boolean emptyCipherStream; + private final long netSourceFileSize; + private final Ciphers.AesGcmDecryptor gcmDecryptor; + private final byte[] ciphertextBlockBuffer; + private final int cipherBlockSize; + private final int plainBlockSize; + private final int numberOfBlocks; + private final int lastCipherBlockSize; + private final long plainStreamSize; + private final byte[] fileAadPrefix; + + private long plainStreamPosition; + private int currentBlockIndex; + private int currentOffsetInPlainBlock; + + AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, +byte[] aesKey, byte[] fileAadPrefix) throws IOException { +this.netSourceFileSize = sourceLength - Ciphers.GCM_STREAM_PREFIX_LENGTH; +Preconditions.checkArgument(netSourceFileSize >= 0, +"Source length " + sourceLength + " is shorter than GCM prefix. File is not encrypted"); Review Comment: This code can be reached only if the parent metadata has marked this file as encrypted (and provided us with the decryption key). If the file is not encrypted, this is an error situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] zstraw commented on issue #4550: the snapshot file is lost when write iceberg using flink Failed to open input stream for file File does not exist
zstraw commented on issue #4550: URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1340655807 > I have solved this problem. Thank you. My problem mainly occurs when InMemoryLockManager releases the heartbeat of the lock and reports a NullPointerException; I rewrote InMemoryLockManager to solve this problem. 在 2022年12月6日 ***@***.***> 写道: After deeping into iceberg code and the log, I can reproduce it in debugging locally. The scenario may happens in the process of Flink cancelling. IcebergFileCommitter is going to commit file. In the step of rename metadata.json(org.apache.iceberg.hadoop.HadoopTableOperations#renameToFinal), org.apache.hadoop.ipc.Client.call encounters InterruptedIOException. I suspect it comes from Flink task cancelling. On the other hand, Hdfs has renamed the metada.json file sucessfully. After rename fails, it's supposed to retry. But the thread encounters InterruptedException in sleeping(org.apache.iceberg.util.Tasks#runTaskWithRetry). Then it will throw a RuntimeException. And the version-hint will not be updated. The RuntimeException lea ds to rollback in org.apache.iceberg.BaseTransaction(#cleanUpOnCommitFailure), which will delete manifest list (snap-XXX). — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: ***@***.***> But I think this problem is still a bug in Iceberg's commit procedure. There are several tasks encountering snap-XXX.avro files lost in our env. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on PR #6293: URL: https://github.com/apache/iceberg/pull/6293#issuecomment-1340672765 > I'm not a big fan of the fake filesystem approach here, mostly because i'm afraid of mocking an object like that when we don't have the full filesystem state. I feel like this patch would have us maintaining a rather large Hadoop mock. > > Is there any chance we can convince the ORC project to allow the creation of a writer from an "java.io.OutputStream" instead of always creating its own file? Thanks for this insight. Overall I agree that this should not be our long term answer and we shall work in the ORC community to make this integration better. In the meantime hopefully this provides the capability and given the relative stability of the FileSystem APIs and limited exposure to just creation of the input and output streams hope this is acceptable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] gaborkaszab commented on pull request #6369: Increase Partition Start Id to 10000
gaborkaszab commented on PR #6369: URL: https://github.com/apache/iceberg/pull/6369#issuecomment-1340673537 This seems a reasonable change for me. Just a question for my better understanding: The tables that we have already written will still have their partition field IDs from 1000, right? So in case we have some tables that have more than 1000 cols and written prior to this change will still have the collision with the partition field IDs and will only be fixed if they are rewritten, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1041988137 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { + readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength()); } return newFileReader(file.location(), readerOptions); } + + static Writer newFileWriter( + OutputFile file, OrcFile.WriterOptions options, Map metadata) { +if (file instanceof HadoopOutputFile) { + options.fileSystem(((HadoopOutputFile) file).getFileSystem()); +} else { + options.fileSystem(new OutputFileSystem(file)); +} +final Path locPath = new Path(file.location()); +final Writer writer; + +try { + writer = OrcFile.createWriter(locPath, options); +} catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Can't create file %s", locPath); +} + +metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + +return writer; + } + + private static class WrappedSeekableInputStream extends FSInputStream { +private final SeekableInputStream inputStream; +private boolean closed; +private final StackTraceElement[] createStack; + +private WrappedSeekableInputStream(SeekableInputStream inputStream) { + this.inputStream = inputStream; + this.createStack = Thread.currentThread().getStackTrace(); + this.closed = false; +} + +@Override +public void seek(long pos) throws IOException { + inputStream.seek(pos); +} + +@Override +public long getPos() throws IOException { + return inputStream.getPos(); +} + +@Override +public boolean seekToNewSource(long targetPos) throws IOException { + throw new UnsupportedOperationException("seekToNewSource not supported"); +} + +@Override +public int read() throws IOException { + return inputStream.read(); +} + +@Override +public int read(@NotNull byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); +} + +@Override +public void close() throws IOException { + inputStream.close(); + closed = true; +} + +@SuppressWarnings("checkstyle:NoFinalizer") +@Override +protected void finalize() throws Throwable { + super.finalize(); + if (!closed) { +close(); // releasing resources is more important than printing the warning +String trace = +Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); +LOG.warn("Unclosed input stream created by:\n\t{}", trace); + } +} + } + + private static class NullFileSystem extends FileSystem { + +@Override +public URI getUri() { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataInputStream open(Path f) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataInputStream open(Path f, int bufferSize) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataOutputStream create( +Path f, +FsPermission permission, +boolean overwrite, +int bufferSize, +short replication, +long blockSize, +Progressable progress) +throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) +throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public boolean rename(Path src, Path dst) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public boolean delete(Path f, boolean recursive) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + throw new UnsupportedOperationException(); +} + +@Override +public void setWorkingDirectory(Path new_dir) { + throw new UnsupportedOperationException(); +} + +@Override +public Path getWorkingDirectory() { + throw new UnsupportedOperationException(); +} + +@Override +public boolean mkdirs(Path f, FsPermission permission) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FileStatus getFileStatus(Path f) throws IOException { + throw new UnsupportedOperationException(); +} + } + + static class InputFileSystem ex
[GitHub] [iceberg] ayushtkn commented on pull request #6369: Increase Partition Start Id to 10000
ayushtkn commented on PR #6369: URL: https://github.com/apache/iceberg/pull/6369#issuecomment-1340677494 >written prior to this change will still have the collision with the partition field IDs and will only be fixed if they are, or at lest their metadata is rewritten, right? yep -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1041995292 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { + readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength()); } return newFileReader(file.location(), readerOptions); } + + static Writer newFileWriter( + OutputFile file, OrcFile.WriterOptions options, Map metadata) { +if (file instanceof HadoopOutputFile) { + options.fileSystem(((HadoopOutputFile) file).getFileSystem()); +} else { + options.fileSystem(new OutputFileSystem(file)); +} +final Path locPath = new Path(file.location()); +final Writer writer; + +try { + writer = OrcFile.createWriter(locPath, options); +} catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Can't create file %s", locPath); +} + +metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + +return writer; + } + + private static class WrappedSeekableInputStream extends FSInputStream { +private final SeekableInputStream inputStream; +private boolean closed; +private final StackTraceElement[] createStack; + +private WrappedSeekableInputStream(SeekableInputStream inputStream) { + this.inputStream = inputStream; + this.createStack = Thread.currentThread().getStackTrace(); + this.closed = false; +} + +@Override +public void seek(long pos) throws IOException { + inputStream.seek(pos); +} + +@Override +public long getPos() throws IOException { + return inputStream.getPos(); +} + +@Override +public boolean seekToNewSource(long targetPos) throws IOException { + throw new UnsupportedOperationException("seekToNewSource not supported"); +} + +@Override +public int read() throws IOException { + return inputStream.read(); +} + +@Override +public int read(@NotNull byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); +} + +@Override +public void close() throws IOException { + inputStream.close(); + closed = true; +} + +@SuppressWarnings("checkstyle:NoFinalizer") +@Override +protected void finalize() throws Throwable { + super.finalize(); + if (!closed) { +close(); // releasing resources is more important than printing the warning +String trace = +Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); +LOG.warn("Unclosed input stream created by:\n\t{}", trace); + } +} + } + + private static class NullFileSystem extends FileSystem { + +@Override +public URI getUri() { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataInputStream open(Path f) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataInputStream open(Path f, int bufferSize) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataOutputStream create( +Path f, +FsPermission permission, +boolean overwrite, +int bufferSize, +short replication, +long blockSize, +Progressable progress) +throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) +throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public boolean rename(Path src, Path dst) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public boolean delete(Path f, boolean recursive) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + throw new UnsupportedOperationException(); +} + +@Override +public void setWorkingDirectory(Path new_dir) { + throw new UnsupportedOperationException(); +} + +@Override +public Path getWorkingDirectory() { + throw new UnsupportedOperationException(); +} + +@Override +public boolean mkdirs(Path f, FsPermission permission) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FileStatus getFileStatus(Path f) throws IOException { + throw new UnsupportedOperationException(); +} + } + + static class InputFileSystem ex
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1041996933 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { + readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength()); } return newFileReader(file.location(), readerOptions); } + + static Writer newFileWriter( + OutputFile file, OrcFile.WriterOptions options, Map metadata) { +if (file instanceof HadoopOutputFile) { + options.fileSystem(((HadoopOutputFile) file).getFileSystem()); +} else { + options.fileSystem(new OutputFileSystem(file)); +} +final Path locPath = new Path(file.location()); +final Writer writer; + +try { + writer = OrcFile.createWriter(locPath, options); +} catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Can't create file %s", locPath); +} + +metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + +return writer; + } + + private static class WrappedSeekableInputStream extends FSInputStream { +private final SeekableInputStream inputStream; +private boolean closed; +private final StackTraceElement[] createStack; + +private WrappedSeekableInputStream(SeekableInputStream inputStream) { + this.inputStream = inputStream; + this.createStack = Thread.currentThread().getStackTrace(); + this.closed = false; +} + +@Override +public void seek(long pos) throws IOException { + inputStream.seek(pos); +} + +@Override +public long getPos() throws IOException { + return inputStream.getPos(); +} + +@Override +public boolean seekToNewSource(long targetPos) throws IOException { + throw new UnsupportedOperationException("seekToNewSource not supported"); +} + +@Override +public int read() throws IOException { + return inputStream.read(); +} + +@Override +public int read(@NotNull byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); +} + +@Override +public void close() throws IOException { + inputStream.close(); + closed = true; +} + +@SuppressWarnings("checkstyle:NoFinalizer") +@Override +protected void finalize() throws Throwable { + super.finalize(); + if (!closed) { +close(); // releasing resources is more important than printing the warning +String trace = +Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); +LOG.warn("Unclosed input stream created by:\n\t{}", trace); + } +} + } + + private static class NullFileSystem extends FileSystem { + +@Override +public URI getUri() { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataInputStream open(Path f) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataInputStream open(Path f, int bufferSize) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataOutputStream create( +Path f, +FsPermission permission, +boolean overwrite, +int bufferSize, +short replication, +long blockSize, +Progressable progress) +throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) +throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public boolean rename(Path src, Path dst) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public boolean delete(Path f, boolean recursive) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + throw new UnsupportedOperationException(); +} + +@Override +public void setWorkingDirectory(Path new_dir) { + throw new UnsupportedOperationException(); +} + +@Override +public Path getWorkingDirectory() { + throw new UnsupportedOperationException(); +} + +@Override +public boolean mkdirs(Path f, FsPermission permission) throws IOException { + throw new UnsupportedOperationException(); +} + +@Override +public FileStatus getFileStatus(Path f) throws IOException { + throw new UnsupportedOperationException(); +} + } + + static class InputFileSystem ex
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042005551 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { + readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength()); } return newFileReader(file.location(), readerOptions); } + + static Writer newFileWriter( + OutputFile file, OrcFile.WriterOptions options, Map metadata) { +if (file instanceof HadoopOutputFile) { + options.fileSystem(((HadoopOutputFile) file).getFileSystem()); +} else { + options.fileSystem(new OutputFileSystem(file)); +} +final Path locPath = new Path(file.location()); +final Writer writer; + +try { + writer = OrcFile.createWriter(locPath, options); +} catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Can't create file %s", locPath); +} + +metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + +return writer; + } + + private static class WrappedSeekableInputStream extends FSInputStream { +private final SeekableInputStream inputStream; +private boolean closed; +private final StackTraceElement[] createStack; + +private WrappedSeekableInputStream(SeekableInputStream inputStream) { + this.inputStream = inputStream; + this.createStack = Thread.currentThread().getStackTrace(); + this.closed = false; +} + +@Override +public void seek(long pos) throws IOException { + inputStream.seek(pos); +} + +@Override +public long getPos() throws IOException { + return inputStream.getPos(); +} + +@Override +public boolean seekToNewSource(long targetPos) throws IOException { + throw new UnsupportedOperationException("seekToNewSource not supported"); +} + +@Override +public int read() throws IOException { + return inputStream.read(); +} + +@Override +public int read(@NotNull byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); +} + +@Override +public void close() throws IOException { + inputStream.close(); + closed = true; +} + +@SuppressWarnings("checkstyle:NoFinalizer") +@Override +protected void finalize() throws Throwable { + super.finalize(); + if (!closed) { +close(); // releasing resources is more important than printing the warning +String trace = +Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); +LOG.warn("Unclosed input stream created by:\n\t{}", trace); + } +} + } + + private static class NullFileSystem extends FileSystem { Review Comment: Sure. Moved to FileIOFSUtil class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042007374 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { + readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength()); } return newFileReader(file.location(), readerOptions); } + + static Writer newFileWriter( + OutputFile file, OrcFile.WriterOptions options, Map metadata) { +if (file instanceof HadoopOutputFile) { + options.fileSystem(((HadoopOutputFile) file).getFileSystem()); +} else { + options.fileSystem(new OutputFileSystem(file)); +} +final Path locPath = new Path(file.location()); +final Writer writer; + +try { + writer = OrcFile.createWriter(locPath, options); +} catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Can't create file %s", locPath); +} + +metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + +return writer; + } + + private static class WrappedSeekableInputStream extends FSInputStream { +private final SeekableInputStream inputStream; +private boolean closed; +private final StackTraceElement[] createStack; + +private WrappedSeekableInputStream(SeekableInputStream inputStream) { + this.inputStream = inputStream; + this.createStack = Thread.currentThread().getStackTrace(); + this.closed = false; +} + +@Override +public void seek(long pos) throws IOException { + inputStream.seek(pos); +} + +@Override +public long getPos() throws IOException { + return inputStream.getPos(); +} + +@Override +public boolean seekToNewSource(long targetPos) throws IOException { + throw new UnsupportedOperationException("seekToNewSource not supported"); +} + +@Override +public int read() throws IOException { + return inputStream.read(); +} + +@Override +public int read(@NotNull byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); +} + +@Override +public void close() throws IOException { + inputStream.close(); + closed = true; +} + +@SuppressWarnings("checkstyle:NoFinalizer") +@Override +protected void finalize() throws Throwable { + super.finalize(); + if (!closed) { Review Comment: Makes sense. Removing this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042007715 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { + readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength()); Review Comment: Added comment to clarify this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] gaborkaszab commented on pull request #6369: Increase Partition Start Id to 10000
gaborkaszab commented on PR #6369: URL: https://github.com/apache/iceberg/pull/6369#issuecomment-1340696570 Thanks for the answer, @ayushtkn! I wonder if it would make sense to make the already written tables work as expected even with more than 1000 cols. E.g. when reading their metadata we could convert their field IDs (starting from 1000) to the new way (starting from 1). They would remain the same in the written metadata files but internally we could keep track of the field IDs starting from 1. Would it make sense? I can create a separate issue for this for further discussion and if it does make sense I could give the implementation a try. @RussellSpitzer ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1042013519 ## core/src/main/java/org/apache/iceberg/BaseFilesTable.java: ## @@ -223,34 +225,28 @@ ManifestFile manifest() { static class ContentFileStructWithMetrics implements StructLike { private final StructLike fileAsStruct; private final MetricsUtil.ReadableMetricsStruct readableMetrics; -private final int expectedSize; +private final int position; Review Comment: In flink, fields are obtained strictly in the order of the projection. We can't guarantee that `ReadableMetrics` is always at the end, so I changed it. cc @szehon-ho -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1042019390 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java: ## @@ -104,11 +105,38 @@ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) { Types.StructType struct = convert(flinkSchema).asStruct(); // reassign ids to match the base schema Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema); +// reassign doc to match the base schema +schema = reassignDoc(schema, baseSchema); Review Comment: The reason I added this logic is: https://github.com/apache/iceberg/pull/5376/files#diff-db9787684ca50d6d4fdfe8ee927613d2e8e5c29eaa894535d39147b4f631cb17R152-R157 If the doc of the field is different, it will not pass the validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042021455 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { Review Comment: I moved the OrcFile.createReader method into the new method and deprecated the previous one. That is a package local method, if acceptable I can remove it as it has no other uses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6222: Flink: Support inspecting table
hililiwei commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1042027502 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ## @@ -140,15 +142,25 @@ public Catalog catalog() { return icebergCatalog; } - private Namespace toNamespace(String database) { + /** Append a new level to the base namespace */ + private static Namespace appendLevel(Namespace baseNamespace, String newLevel) { String[] namespace = new String[baseNamespace.levels().length + 1]; System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length); -namespace[baseNamespace.levels().length] = database; +namespace[baseNamespace.levels().length] = newLevel; return Namespace.of(namespace); } TableIdentifier toIdentifier(ObjectPath path) { -return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName()); +String objectName = path.getObjectName(); +List tableName = Splitter.on('$').splitToList(objectName); +if (tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null) { Review Comment: `tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null` This condition seems to be sufficient to pick up the metadata table, and we don't need to add additional precondition check. We just need to make sure that the non-metadata table doesn't match the if branch, and it continues with the old logic. Or have I misunderstood what you're trying to say? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042038382 ## orc/src/main/java/org/apache/iceberg/orc/ORC.java: ## @@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) { ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true); if (file instanceof HadoopInputFile) { readerOptions.filesystem(((HadoopInputFile) file).getFileSystem()); +} else { + readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength()); } return newFileReader(file.location(), readerOptions); } + + static Writer newFileWriter( + OutputFile file, OrcFile.WriterOptions options, Map metadata) { +if (file instanceof HadoopOutputFile) { + options.fileSystem(((HadoopOutputFile) file).getFileSystem()); +} else { + options.fileSystem(new OutputFileSystem(file)); +} +final Path locPath = new Path(file.location()); +final Writer writer; + +try { + writer = OrcFile.createWriter(locPath, options); +} catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Can't create file %s", locPath); +} + +metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + +return writer; + } + + private static class WrappedSeekableInputStream extends FSInputStream { +private final SeekableInputStream inputStream; +private boolean closed; +private final StackTraceElement[] createStack; + +private WrappedSeekableInputStream(SeekableInputStream inputStream) { Review Comment: Did the following: * Moved class to HadoopStreams * Made HadoopStreams public to be accessible here * Implemented DelegatingInputStream -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042041584 ## orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java: ## @@ -88,8 +86,7 @@ options.fileSystem(((HadoopOutputFile) file).getFileSystem()); } options.setSchema(orcSchema); -this.writer = newOrcWriter(file, options, metadata); - +this.writer = ORC.newFileWriter(file, options, metadata); Review Comment: Yes that is still needed. We continue to retain the current behavior for HadoopIO. So the FileSystem from HadoopOutputFile or HadoopInputFile is used for the FS operations. When we use anything other than HadoopIO the FileIOFS kicks in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042042743 ## orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java: ## @@ -126,4 +135,116 @@ public void testDataWriter() throws IOException { Assert.assertEquals("Written records should match", records, writtenRecords); } + + @Test + public void testUsingFileIO() throws IOException { +// Show that FileSystem access is not possible for the file we are supplying as the scheme +// dummy is not handled +ProxyOutputFile outFile = new ProxyOutputFile(Files.localOutput(temp.newFile())); +Assertions.assertThatThrownBy( +() -> HadoopOutputFile.fromPath(new Path(outFile.location()), new Configuration())) +.isInstanceOf(RuntimeIOException.class) +.hasMessageStartingWith("Failed to get file system for path: dummy"); + +// We are creating the proxy +SortOrder sortOrder = SortOrder.builderFor(SCHEMA).withOrderId(10).asc("id").build(); Review Comment: Sure, removed it to simplify the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on code in PR #6293: URL: https://github.com/apache/iceberg/pull/6293#discussion_r1042049334 ## orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java: ## @@ -126,4 +135,116 @@ public void testDataWriter() throws IOException { Assert.assertEquals("Written records should match", records, writtenRecords); } + + @Test + public void testUsingFileIO() throws IOException { +// Show that FileSystem access is not possible for the file we are supplying as the scheme +// dummy is not handled +ProxyOutputFile outFile = new ProxyOutputFile(Files.localOutput(temp.newFile())); +Assertions.assertThatThrownBy( +() -> HadoopOutputFile.fromPath(new Path(outFile.location()), new Configuration())) +.isInstanceOf(RuntimeIOException.class) +.hasMessageStartingWith("Failed to get file system for path: dummy"); + +// We are creating the proxy +SortOrder sortOrder = SortOrder.builderFor(SCHEMA).withOrderId(10).asc("id").build(); + +DataWriter dataWriter = +ORC.writeData(outFile) +.schema(SCHEMA) +.createWriterFunc(GenericOrcWriter::buildWriter) +.overwrite() +.withSpec(PartitionSpec.unpartitioned()) Review Comment: Sorry didn't follow this. What do we mean by default? Before the patch if you gave a Local[Output|Input]File this was converted to FileSystem operation resulting in a LocalFileSystem for handling this. To ensure that is not taking place, we are mimicking a schema `dummy` that is not handled, so if any FS operations happen normally then it will fail with `Failed to get file system for path: dummy` but if handled via FileIOFS then it should be successful. I will add this comment to the test to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pavibhai commented on pull request #6293: Added FileIO Support for ORC Reader and Writers
pavibhai commented on PR #6293: URL: https://github.com/apache/iceberg/pull/6293#issuecomment-1340790968 > Looks mostly good overall. Thanks for getting this working @pavibhai! Thanks @rdblue for your comments. I have addressed the comments, there are a few comments where I gave additional clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on issue #6366: Spark Sql update data failure
nastra commented on issue #6366: URL: https://github.com/apache/iceberg/issues/6366#issuecomment-1340870927 @gnikgnaw did you have a chance looking at my last comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] gnikgnaw commented on issue #6366: Spark Sql update data failure
gnikgnaw commented on issue #6366: URL: https://github.com/apache/iceberg/issues/6366#issuecomment-1340886112 > @gnikgnaw did you have a chance looking at my last comment? hi @nastra Thank you very much for your help, after modifying the spark version, my problem is solved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] gnikgnaw closed issue #6366: Spark Sql update data failure
gnikgnaw closed issue #6366: Spark Sql update data failure URL: https://github.com/apache/iceberg/issues/6366 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat opened a new issue, #6375: Consider delete manifests for rewrite manifests
ajantha-bhat opened a new issue, #6375: URL: https://github.com/apache/iceberg/issues/6375 ### Feature Request / Improvement As per the code it looks like we are just considering data manifests for the rewrite. Should we also, support delete manifests to be rewritten into a bigger delete manifest file? https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java#L214 I would like to know if there is any edge case around this. If not, I can start working on this. cc: @RussellSpitzer, @szehon-ho ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rajarshisarkar opened a new pull request, #6376: Docs: Add register table Spark procedure documentation
rajarshisarkar opened a new pull request, #6376: URL: https://github.com/apache/iceberg/pull/6376 Add documentation for https://github.com/apache/iceberg/pull/4810 --- cc: @RussellSpitzer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6376: Docs: Add register table Spark procedure documentation
ajantha-bhat commented on code in PR #6376: URL: https://github.com/apache/iceberg/pull/6376#discussion_r1042165921 ## docs/spark-procedures.md: ## @@ -493,6 +493,35 @@ CALL spark_catalog.system.add_files( ) ``` +### `register_table` + +Creates a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier. Review Comment: Should we add a warning about registering tables which exist in another catalog to current catalog? Because in that case table exists in both catalogs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #6376: Docs: Add register table Spark procedure documentation
rajarshisarkar commented on code in PR #6376: URL: https://github.com/apache/iceberg/pull/6376#discussion_r1042170977 ## docs/spark-procedures.md: ## @@ -493,6 +493,35 @@ CALL spark_catalog.system.add_files( ) ``` +### `register_table` + +Creates a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier. Review Comment: Yeah, I have added a warning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6376: Docs: Add register table Spark procedure documentation
ajantha-bhat commented on code in PR #6376: URL: https://github.com/apache/iceberg/pull/6376#discussion_r1042177406 ## docs/spark-procedures.md: ## @@ -493,6 +493,37 @@ CALL spark_catalog.system.add_files( ) ``` +### `register_table` + +Creates a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier. + + Usage + +| Argument Name | Required? | Type | Description | +|---|---|--|-| +| `table` | ✔️ | string | Table which is to be registered | +| `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier | + +Warning: If we register tables which exist in another catalog to the current catalog, then the tables would exist in both the catalogs. Review Comment: ```suggestion Warning: If we register tables which exist in another catalog to the current catalog, then the tables would exist in both the catalogs. And using same table from multiple catalogs is not recommended as it fails to keep the table metadata up to date. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on issue #6370: What is the purpose of Hive Lock ?
pvary commented on issue #6370: URL: https://github.com/apache/iceberg/issues/6370#issuecomment-1341067753 @dmgcodevil: The purpose of the Hive Lock is to make sure that there are no concurrent changes to the table. Specifically that there is no concurrent Iceberg commit. In theory the replace is a good idea, but in an ideal world that is a new feature in HMS which could be only supported in the upcoming releases (4.0.0). Most of the Iceberg community uses older 3.x Hive (HMS) version, or even older 2.x some cases. Adding a new feature there is usually not allowed. Just from the Iceberg perspective this change would be very much needed and welcome as it would mean faster/more reliable commit flow for HiveCatalogs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer merged pull request #6360: Docs: Update Zorder spark support versions.
RussellSpitzer merged PR #6360: URL: https://github.com/apache/iceberg/pull/6360 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on pull request #6360: Docs: Update Zorder spark support versions.
RussellSpitzer commented on PR #6360: URL: https://github.com/apache/iceberg/pull/6360#issuecomment-134581 Thanks @ajantha-bhat , looking much clearer now! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6376: Docs: Add register table Spark procedure documentation
RussellSpitzer commented on code in PR #6376: URL: https://github.com/apache/iceberg/pull/6376#discussion_r1042333590 ## docs/spark-procedures.md: ## @@ -493,6 +493,37 @@ CALL spark_catalog.system.add_files( ) ``` +### `register_table` + +Creates a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier. + + Usage + +| Argument Name | Required? | Type | Description | +|---|---|--|-| +| `table` | ✔️ | string | Table which is to be registered | +| `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier | + +Warning: If we register tables which exist in another catalog to the current catalog, then the tables would exist in both the catalogs. And using same table from multiple catalogs is not recommended as it fails to keep the table metadata up to date. Review Comment: I think this has to be much stronger. Having a table registered in two catalogs is essentially a Split Brain issue. It wont' just have problems with metadata being kept up to date, it will also potentially lose data. I would say something along "Warning: Having the same metadata.json registered in more than one catalog can lead to missing updates, loss of data, and table corruption." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6376: Docs: Add register table Spark procedure documentation
RussellSpitzer commented on code in PR #6376: URL: https://github.com/apache/iceberg/pull/6376#discussion_r1042333590 ## docs/spark-procedures.md: ## @@ -493,6 +493,37 @@ CALL spark_catalog.system.add_files( ) ``` +### `register_table` + +Creates a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier. + + Usage + +| Argument Name | Required? | Type | Description | +|---|---|--|-| +| `table` | ✔️ | string | Table which is to be registered | +| `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier | + +Warning: If we register tables which exist in another catalog to the current catalog, then the tables would exist in both the catalogs. And using same table from multiple catalogs is not recommended as it fails to keep the table metadata up to date. Review Comment: I think this has to be much stronger. Having a table registered in two catalogs is essentially a Split Brain issue. It wont' just have problems with metadata being kept up to date, it will also potentially lose data. I would say something along "Warning: Having the same metadata.json registered in more than one catalog can lead to missing updates, loss of data, and table corruption. Only use this procedure when the table is no longer registered in an existing catalog, or you are moving a table between catalogs." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer merged pull request #6374: Docs: Remove backticks from Spark procedure headings
RussellSpitzer merged PR #6374: URL: https://github.com/apache/iceberg/pull/6374 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on pull request #6374: Docs: Remove backticks from Spark procedure headings
RussellSpitzer commented on PR #6374: URL: https://github.com/apache/iceberg/pull/6374#issuecomment-1341121146 Looks good to me, Thanks for the cleanup! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042346944 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java: ## @@ -255,74 +256,90 @@ public static org.apache.iceberg.Table toIcebergTable(Table table) { return sparkTable.table(); } + public static Transform[] toTransforms(Schema schema, List fields) { +SpecTransformToSparkTransform visitor = new SpecTransformToSparkTransform(schema); + +List transforms = Lists.newArrayList(); + +for (PartitionField field : fields) { + Transform transform = PartitionSpecVisitor.visit(schema, field, visitor); + if (transform != null) { +transforms.add(transform); + } +} + +return transforms.toArray(new Transform[0]); + } + /** * Converts a PartitionSpec to Spark transforms. * * @param spec a PartitionSpec * @return an array of Transforms */ public static Transform[] toTransforms(PartitionSpec spec) { -Map quotedNameById = SparkSchemaUtil.indexQuotedNameById(spec.schema()); -List transforms = -PartitionSpecVisitor.visit( -spec, -new PartitionSpecVisitor() { - @Override - public Transform identity(String sourceName, int sourceId) { -return Expressions.identity(quotedName(sourceId)); - } +SpecTransformToSparkTransform visitor = new SpecTransformToSparkTransform(spec.schema()); +List transforms = PartitionSpecVisitor.visit(spec, visitor); +return transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new); + } - @Override - public Transform bucket(String sourceName, int sourceId, int numBuckets) { -return Expressions.bucket(numBuckets, quotedName(sourceId)); - } + private static class SpecTransformToSparkTransform implements PartitionSpecVisitor { +private final Map quotedNameById; - @Override - public Transform truncate(String sourceName, int sourceId, int width) { -return Expressions.apply( -"truncate", -Expressions.column(quotedName(sourceId)), -Expressions.literal(width)); - } +SpecTransformToSparkTransform(Schema schema) { + this.quotedNameById = SparkSchemaUtil.indexQuotedNameById(schema); +} - @Override - public Transform year(String sourceName, int sourceId) { -return Expressions.years(quotedName(sourceId)); - } +@Override +public Transform identity(String sourceName, int sourceId) { + return Expressions.identity(quotedName(sourceId)); +} - @Override - public Transform month(String sourceName, int sourceId) { -return Expressions.months(quotedName(sourceId)); - } +@Override +public Transform bucket(String sourceName, int sourceId, int numBuckets) { + return Expressions.bucket(numBuckets, quotedName(sourceId)); +} - @Override - public Transform day(String sourceName, int sourceId) { -return Expressions.days(quotedName(sourceId)); - } +@Override +public Transform truncate(String sourceName, int sourceId, int width) { + NamedReference column = Expressions.column(quotedName(sourceId)); + return Expressions.apply("truncate", Expressions.literal(width), column); Review Comment: So this is ok as a change because the previous version wouldn't have actually worked correct? Just making sure we aren't breaking any api here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6376: Docs: Add register table Spark procedure documentation
ajantha-bhat commented on code in PR #6376: URL: https://github.com/apache/iceberg/pull/6376#discussion_r1042354666 ## docs/spark-procedures.md: ## @@ -493,6 +493,37 @@ CALL spark_catalog.system.add_files( ) ``` +### `register_table` + +Creates a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier. + + Usage + +| Argument Name | Required? | Type | Description | +|---|---|--|-| +| `table` | ✔️ | string | Table which is to be registered | +| `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier | + +Warning: If we register tables which exist in another catalog to the current catalog, then the tables would exist in both the catalogs. And using same table from multiple catalogs is not recommended as it fails to keep the table metadata up to date. Review Comment: sgtm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042357442 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ## @@ -42,4 +42,9 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls whether to preserve the existing grouping of data while planning splits + public static final String PRESERVE_DATA_GROUPING = + "spark.sql.iceberg.split.preserve-data-grouping"; + public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; Review Comment: My question here would be should we really have the default be false? I have no problem with the name but It feels like this is probably always the right decision for a scan with possible joins. We should probably look into this more in the future but my guess is that if a query requires partition columns, we should group by those columns. That's my long way to say i'm fine with the default being false for now, but I think it should probably be true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on pull request #6369: Increase Partition Start Id to 10000
RussellSpitzer commented on PR #6369: URL: https://github.com/apache/iceberg/pull/6369#issuecomment-1341155158 @gaborkaszab I would probably just recommended dropping and recreating the table (via metadata) or having a separate utility for modifying existing tables. I really don't think many folks have 1000 columns since we would have seen this before so I don't think the upgrade procedure really needs to exist inside the Iceberg repo. I think this change is pretty small and harmless though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6372: [Python] Fix incorrect description when set a property
Fokko commented on code in PR #6372: URL: https://github.com/apache/iceberg/pull/6372#discussion_r1042388262 ## python/pyiceberg/cli/console.py: ## @@ -103,7 +103,7 @@ def list(ctx: Context, parent: Optional[str]): # pylint: disable=redefined-buil @click.pass_context @catch_exception() def describe(ctx: Context, entity: Literal["name", "namespace", "table"], identifier: str): -"""Describes a namespace xor table""" Review Comment: This is actually a bad joke. If you have hierarchical namespaces, then it will describe the namespace, `namespace.namespace.table`, in the case of a table `namespace.table` it would describe the table. But wouldn't do both (because that's not possible). But I agree that this is better :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko merged pull request #6372: [Python] Fix incorrect description when set a property
Fokko merged PR #6372: URL: https://github.com/apache/iceberg/pull/6372 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042394528 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseScanTaskGroup; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Scan; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.SupportsReportPartitioning; +import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning; +import org.apache.spark.sql.connector.read.partitioning.Partitioning; +import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning; + +abstract class SparkPartitioningAwareScan extends SparkScan +implements SupportsReportPartitioning { + + private final Scan> scan; + private final boolean preserveDataGrouping; + + private Set specs = null; // lazy cache of scanned specs + private List tasks = null; // lazy cache of uncombined tasks + private List> taskGroups = null; // lazy cache of task groups + private StructType groupingKeyType = null; // lazy cache of the grouping key type + private StructLikeSet groupingKeys = null; // lazy cache of grouping keys + + SparkPartitioningAwareScan( + SparkSession spark, + Table table, + Scan> scan, + SparkReadConf readConf, + Schema expectedSchema, + List filters) { + +super(spark, table, readConf, expectedSchema, filters); + +this.scan = scan; +this.preserveDataGrouping = readConf.preserveDataGrouping(); + +if (scan == null) { + this.specs = Collections.emptySet(); + this.tasks = Collections.emptyList(); + this.taskGroups = Collections.emptyList(); +} + } + + protected abstract Class taskJavaClass(); + + protected Scan> scan() { +return scan; + } + + @Override + public Partitioning outputPartitioning() { +Preconditions.checkState(taskGroups() != null, "Task groups must be planned"); + +if (groupingKeyType().fields().isEmpty()) { + return new UnknownPartitioning(taskGroups().size()); +} else { + return new KeyGroupedPartitioning(groupingKeyTransforms(), taskGroups().size()); Review Comment: Is this present in the SparkUI anywhere? Feel like we should have some way of knowing whether or not the partitioning was successfully applied -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042402581 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( Review Comment: How many of these properties are required because of current Spark limitations? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042413822 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + private static final Map SPJ_OFF_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + @BeforeClass + public static void setupSparkConf() { +spark.conf().set("spark.sql.shuffle.partitions", "4"); + } + + @After + public void removeTables() { +sql("DROP TABLE IF EXISTS %s", tableName); +sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME)); + } + + // TODO: add tests for truncate transforms once SPARK-40295 is released + // TODO: add tests for cases when one side contains a subset of keys once Spark supports this + Review Comment: Should we have some tests with multiple buckets? or multiple partition transforms? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042415619 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + private static final Map SPJ_OFF_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + @BeforeClass + public static void setupSparkConf() { +spark.conf().set("spark.sql.shuffle.partitions", "4"); + } + + @After + public void removeTables() { +sql("DROP TABLE IF EXISTS %s", tableName); +sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME)); + } + + // TODO: add tests for truncate transforms once SPARK-40295 is released + // TODO: add tests for cases when one side contains a subset of keys once Spark supports this + Review Comment: I see the example below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042433535 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + private static final Map SPJ_OFF_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + @BeforeClass + public static void setupSparkConf() { +spark.conf().set("spark.sql.shuffle.partitions", "4"); + } + + @After + public void removeTables() { +sql("DROP TABLE IF EXISTS %s", tableName); +sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME)); + } + + // TODO: add tests for truncate transforms once SPARK-40295 is released + // TODO: add tests for cases when one side contains a subset of keys once Spark supports this + Review Comment: Does this work with a non Iceberg Source as part of the join? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042444111 ## core/src/main/java/org/apache/iceberg/Partitioning.java: ## @@ -215,11 +225,12 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) { * that have the same field ID but use a void transform under the hood. Such fields cannot be part * of the grouping key as void transforms always return null. * + * @param schema a schema Review Comment: I think this needs a little more description as well as information about what "null" means in this context. The Schema here is the projected schema to pull transforms from? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] islamismailov commented on pull request #6268: Allow dropping a column used by an old but not currrent partition spec
islamismailov commented on PR #6268: URL: https://github.com/apache/iceberg/pull/6268#issuecomment-1341261730 (i did address Ryan's feedback) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko merged pull request #6268: Allow dropping a column used by an old but not currrent partition spec
Fokko merged PR #6268: URL: https://github.com/apache/iceberg/pull/6268 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
RussellSpitzer commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042453631 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ## @@ -42,4 +42,9 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls whether to preserve the existing grouping of data while planning splits + public static final String PRESERVE_DATA_GROUPING = Review Comment: Name is fine to me, Ideally this isn't something that get's actually configured by the end user -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] islamismailov commented on a diff in pull request #6353: Make sure S3 stream opened by ReadConf ctor is closed
islamismailov commented on code in PR #6353: URL: https://github.com/apache/iceberg/pull/6353#discussion_r1042460459 ## parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java: ## @@ -79,9 +83,11 @@ private ReadConf init() { nameMapping, reuseContainers, caseSensitive, - null); - this.conf = readConf.copy(); - return readConf; + null)) { +this.conf = readConf.copy(); + } catch (IOException e) { +LOG.warn("Failed to close ReadConf", e); Review Comment: I think it's kind of an anti-pattern. `close` normally throws IOException and it's up to the "user" of the class on how to handle that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on issue #2301: Lock remains in HMS if HiveTableOperations gets killed (direct process shutdown - no signals) after lock is acquired
pvary commented on issue #2301: URL: https://github.com/apache/iceberg/issues/2301#issuecomment-1341362767 > We faced similar issues: > > 1. We are using Flink and some processes attempt to modify a table concurrently, e.g.: ingestion process and data compaction process. If one of them fails then a lock can remain in `ACQUIRED` state. We clear all the logs from another process in `onInit` function. This is problematic. You should not `clear` locks. You should only do this when you are sure that the other process is not writing/committing. > 2. This one is weird. We see a lot of errors in logs: `Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s` [see](https://github.com/apache/iceberg/blob/6b8f7e0e31a81029b478e7757aba749f5ed27f42/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java#L295), which means table has been updated regardless of an acquired lock. This is exactly the case which could happen when the locks are removed during a commit and another process is trying to commit concurrenly. The correct solution for the issue is to try to make sure that the lock is removed by the failing process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] autumnust commented on a diff in pull request #6327: ORC: Fix error when projecting nested indentity partition column
autumnust commented on code in PR #6327: URL: https://github.com/apache/iceberg/pull/6327#discussion_r1042526245 ## orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java: ## @@ -442,4 +445,23 @@ static TypeDescription applyNameMapping(TypeDescription orcSchema, NameMapping n public static Map idToOrcName(Schema schema) { return TypeUtil.visit(schema, new IdToOrcName()); } + + /** + * Returns a {@link Schema} which has constant fields and metadata fields removed from the + * provided schema. This utility can be used to create a "read schema" which can be passed to the + * ORC file reader and hence avoiding deserialization and memory costs associated with column + * values already available through Iceberg metadata. + * + * NOTE: This method, unlike {@link TypeUtil#selectNot(Schema, Set)}, preserves empty structs + * (caused due to a struct having all constant fields) so that Iceberg ORC readers can later add + * constant fields in these structs Review Comment: nit: Doesn't have to mention the cause for empty structs as there might be other scenarios like intentional empty struct as part of schema ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6296: Spark-3.3: Use table sort order with sort strategy when user has not specified
ajantha-bhat commented on code in PR #6296: URL: https://github.com/apache/iceberg/pull/6296#discussion_r1042526654 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java: ## @@ -356,6 +356,16 @@ public void testRewriteDataFilesWithInvalidInputs() { + "sort_order => 'c1 ASC NULLS FIRST')", catalogName, tableIdent)); +// Test for sort strategy without any (default/user defined) sort_order +AssertHelpers.assertThrows( +"Should reject calls with error message", +IllegalArgumentException.class, +"Can't use SORT when there is no sort order", Review Comment: I think the existing full error message is enough. Because I just captured a piece from that full message you might though we need to modify the message. Let me know if you still want to change it, if not please merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6296: Spark-3.3: Use table sort order with sort strategy when user has not specified
ajantha-bhat commented on code in PR #6296: URL: https://github.com/apache/iceberg/pull/6296#discussion_r1042526654 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java: ## @@ -356,6 +356,16 @@ public void testRewriteDataFilesWithInvalidInputs() { + "sort_order => 'c1 ASC NULLS FIRST')", catalogName, tableIdent)); +// Test for sort strategy without any (default/user defined) sort_order +AssertHelpers.assertThrows( +"Should reject calls with error message", +IllegalArgumentException.class, +"Can't use SORT when there is no sort order", Review Comment: I think the existing full error message is enough. Because I just captured a piece from that full message you might thought we need to modify the message. Let me know if you still want to change it, if not please merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on issue #6370: What is the purpose of Hive Lock ?
pvary commented on issue #6370: URL: https://github.com/apache/iceberg/issues/6370#issuecomment-1341372808 @InvisibleProgrammer: What do the Hive guys think about this? Would they be interested in adding this feature to HMS? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] tomtongue commented on pull request #6352: AWS: Fix inconsistent behavior of naming S3 location between read and write operations by allowing only s3 bucket name
tomtongue commented on PR #6352: URL: https://github.com/apache/iceberg/pull/6352#issuecomment-1341372839 Thanks for reviewing this PR, Amogh! (Sorry for delaying my response) I'll check your comments and get back tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] TuroczyX commented on issue #6370: What is the purpose of Hive Lock ?
TuroczyX commented on issue #6370: URL: https://github.com/apache/iceberg/issues/6370#issuecomment-1341382512 It is definitely something that we need to consider. We will talk about it on our next meeting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] TuroczyX commented on issue #6368: Delete/Update fails for tables with more than 1000 columns
TuroczyX commented on issue #6368: URL: https://github.com/apache/iceberg/issues/6368#issuecomment-1341384585 @ayushtkn This is settable from hive? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] TuroczyX commented on issue #6347: [Docs]: improve ChangeLog
TuroczyX commented on issue #6347: URL: https://github.com/apache/iceberg/issues/6347#issuecomment-1341391351 @code-magician323 Thanks for your feedback. @InvisibleProgrammer Could you please take care on it for the next? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] TuroczyX commented on issue #6249: Update Iceberg Hive documentation
TuroczyX commented on issue #6249: URL: https://github.com/apache/iceberg/issues/6249#issuecomment-1341395246 Nice :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] TuroczyX commented on issue #6347: [Docs]: improve ChangeLog
TuroczyX commented on issue #6347: URL: https://github.com/apache/iceberg/issues/6347#issuecomment-1341395880 @code-magician323 Something like this? https://github.com/apache/iceberg/issues/6249 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] sunchao commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
sunchao commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042549493 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseScanTaskGroup; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Scan; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.SupportsReportPartitioning; +import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning; +import org.apache.spark.sql.connector.read.partitioning.Partitioning; +import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning; + +abstract class SparkPartitioningAwareScan extends SparkScan +implements SupportsReportPartitioning { + + private final Scan> scan; + private final boolean preserveDataGrouping; + + private Set specs = null; // lazy cache of scanned specs + private List tasks = null; // lazy cache of uncombined tasks + private List> taskGroups = null; // lazy cache of task groups + private StructType groupingKeyType = null; // lazy cache of the grouping key type + private StructLikeSet groupingKeys = null; // lazy cache of grouping keys + + SparkPartitioningAwareScan( + SparkSession spark, + Table table, + Scan> scan, + SparkReadConf readConf, + Schema expectedSchema, + List filters) { + +super(spark, table, readConf, expectedSchema, filters); + +this.scan = scan; +this.preserveDataGrouping = readConf.preserveDataGrouping(); + +if (scan == null) { + this.specs = Collections.emptySet(); + this.tasks = Collections.emptyList(); + this.taskGroups = Collections.emptyList(); +} + } + + protected abstract Class taskJavaClass(); + + protected Scan> scan() { +return scan; + } + + @Override + public Partitioning outputPartitioning() { +Preconditions.checkState(taskGroups() != null, "Task groups must be planned"); + +if (groupingKeyType().fields().isEmpty()) { + return new UnknownPartitioning(taskGroups().size()); +} else { + return new KeyGroupedPartitioning(groupingKeyTransforms(), taskGroups().size()); Review Comment: I don't think we have this info now, but it's a good idea to add it. Let me take a note to implement that on Spark side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] InvisibleProgrammer commented on issue #6347: [Docs]: improve ChangeLog
InvisibleProgrammer commented on issue #6347: URL: https://github.com/apache/iceberg/issues/6347#issuecomment-1341398027 Yes, I can. @Fokko , what do you think about grouping the changelog? And maybe, it would be worth creating some kind of template to the changelogs to create a consistent experience in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] sunchao commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
sunchao commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042551459 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( Review Comment: I think only `SQLConf.V2_BUCKETING_ENABLED` and `SparkSQLProperties.PRESERVE_DATA_GROUPING` are required, and the others are just for the convenience of testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] sunchao commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
sunchao commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042555728 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + private static final Map SPJ_OFF_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + @BeforeClass + public static void setupSparkConf() { +spark.conf().set("spark.sql.shuffle.partitions", "4"); + } + + @After + public void removeTables() { +sql("DROP TABLE IF EXISTS %s", tableName); +sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME)); + } + + // TODO: add tests for truncate transforms once SPARK-40295 is released + // TODO: add tests for cases when one side contains a subset of keys once Spark supports this + Review Comment: In theory it's possible. Spark will check if both sides of the SPJ have compatible partition transforms, via checking whether the V2 function identifiers are the same. So if the non-Iceberg source reports the same functions to Spark and use them in partition transforms, it could work (although I'm not sure whether this is a common use case). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6365: Core: Add position deletes metadata table
szehon-ho commented on code in PR #6365: URL: https://github.com/apache/iceberg/pull/6365#discussion_r1042557855 ## core/src/main/java/org/apache/iceberg/BaseMetadataTable.java: ## @@ -64,9 +64,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) { */ static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) { PartitionSpec.Builder identitySpecBuilder = -PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false); +PartitionSpec.builderFor(metadataTableSchema) Review Comment: Before this change, predicate pushdown would make the PositionDeletes scan tasks have wrong partition field id, spec id, so they will not work in the DeleteFile read. Though it only happens in corner cases like dropped partition fields (where the auto-generated field-ids are not correct anymore). Added a test for this in TestMetadataTableScansWithPartitionEvolution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu opened a new pull request, #6377: Flink: add util class to generate test data with extensive coverage d…
stevenzwu opened a new pull request, #6377: URL: https://github.com/apache/iceberg/pull/6377 …ifferent field types: from primitives to complex nested types -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6365: Core: Add position deletes metadata table
szehon-ho commented on code in PR #6365: URL: https://github.com/apache/iceberg/pull/6365#discussion_r1042560130 ## core/src/main/java/org/apache/iceberg/SerializableTable.java: ## @@ -361,6 +363,27 @@ private String errorMsg(String operation) { return String.format("Operation %s is not supported after the table is serialized", operation); } + public static class SerializablePositionDeletesTable extends SerializableTable { + +private final String baseTableName; + +protected SerializablePositionDeletesTable(PositionDeletesTable deletesTable) { + super(deletesTable); + this.baseTableName = deletesTable.table().name(); +} + +@Override +protected Table newTable(TableOperations ops, String tableName) { + Table baseTable = new BaseTable(ops, baseTableName); + return new PositionDeletesTable(ops, baseTable, tableName); +} + +@Override +public BatchScan newBatchScan() { Review Comment: Otherwise, we will go to the default which returns BatchScanAdapter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6377: Flink: add util class to generate test data with extensive coverage d…
stevenzwu commented on PR #6377: URL: https://github.com/apache/iceberg/pull/6377#issuecomment-1341416995 We had this util class internally for testing Avro GenericRecord to Flink RowData converter. It can be useful for writing unit test for the `StructRowData` class from PR #6222. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6377: Flink: add util class to generate test data with extensive coverage d…
stevenzwu commented on PR #6377: URL: https://github.com/apache/iceberg/pull/6377#issuecomment-1341418873 Example usage ``` public class TestRowDataToAvroGenericRecordConverter { protected void testConverter(DataGenerator dataGenerator) throws Exception { RowDataToAvroGenericRecordConverter converter = RowDataToAvroGenericRecordConverter.fromAvroSchema(dataGenerator.avroSchema()); GenericRecord expected = dataGenerator.generateAvroGenericRecord(); GenericRecord actual = converter.apply(dataGenerator.generateFlinkRowData()); Assert.assertEquals(expected, actual); } @Test public void testPrimitiveTypes() throws Exception { testConverter(new DataGenerators.Primitives()); } @Test public void testStructOfPrimitive() throws Exception { testConverter(new DataGenerators.StructOfPrimitive()); } @Test public void testStructOfArray() throws Exception { testConverter(new DataGenerators.StructOfArray()); } @Test public void testStructOfMap() throws Exception { testConverter(new DataGenerators.StructOfMap()); } @Test public void testStructOfStruct() throws Exception { testConverter(new DataGenerators.StructOfStruct()); } @Test public void testArrayOfPrimitive() throws Exception { testConverter(new DataGenerators.ArrayOfPrimitive()); } @Test public void testArrayOfArray() throws Exception { testConverter(new DataGenerators.ArrayOfArray()); } @Test public void testArrayOfMap() throws Exception { testConverter(new DataGenerators.ArrayOfMap()); } @Test public void testArrayOfStruct() throws Exception { testConverter(new DataGenerators.ArrayOfStruct()); } @Test public void testMapOfPrimitives() throws Exception { testConverter(new DataGenerators.MapOfPrimitives()); } @Test public void testMapOfArray() throws Exception { testConverter(new DataGenerators.MapOfArray()); } @Test public void testMapOfMap() throws Exception { testConverter(new DataGenerators.MapOfMap()); } @Test public void testMapOfStruct() throws Exception { testConverter(new DataGenerators.MapOfStruct()); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6222: Flink: Support inspecting table
szehon-ho commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1042568978 ## core/src/main/java/org/apache/iceberg/BaseFilesTable.java: ## @@ -223,34 +225,28 @@ ManifestFile manifest() { static class ContentFileStructWithMetrics implements StructLike { private final StructLike fileAsStruct; private final MetricsUtil.ReadableMetricsStruct readableMetrics; -private final int expectedSize; +private final int position; Review Comment: Makes sense to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer opened a new pull request, #6378: Spark: Extend Timeout During Partial Progress Rewrites
RussellSpitzer opened a new pull request, #6378: URL: https://github.com/apache/iceberg/pull/6378 In order to avoid timing out when writing large manifest files, we increase the timeout allowed for the commit phase of partial progress based on the number of commits left to perform. Temporary Fix for performance issues in #6367 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #6367: Partial Progress Compaction can Timeout on Very Large Manfiest Commits
RussellSpitzer commented on issue #6367: URL: https://github.com/apache/iceberg/issues/6367#issuecomment-1341500255 Filed a quick PR to just extend the timeout -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6378: Spark: Extend Timeout During Partial Progress Rewrites
RussellSpitzer commented on code in PR #6378: URL: https://github.com/apache/iceberg/pull/6378#discussion_r1042612662 ## core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java: ## @@ -225,25 +225,40 @@ public void close() { LOG.info("Closing commit service for {}", table); committerService.shutdown(); + boolean timeout = false; + int waitTime; try { // All rewrites have completed and all new files have been created, we are now waiting for // the commit -// pool to finish doing it's commits to Iceberg State. In the case of partial progress this +// pool to finish doing its commits to Iceberg State. In the case of partial progress this // should // have been occurring simultaneously with rewrites, if not there should be only a single // commit operation. -// In either case this should take much less than 10 minutes to actually complete. -if (!committerService.awaitTermination(10, TimeUnit.MINUTES)) { +// We will wait 10 minutes plus 5 more minutes for each commit left to perform due to the +// time required for writing manifests +waitTime = 10 + (completedRewrites.size() / rewritesPerCommit) * 5; Review Comment: I'm also open to just change this to 60 minutes just to keep it simpler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6378: Spark: Extend Timeout During Partial Progress Rewrites
RussellSpitzer commented on code in PR #6378: URL: https://github.com/apache/iceberg/pull/6378#discussion_r1042613071 ## core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java: ## @@ -225,25 +225,40 @@ public void close() { LOG.info("Closing commit service for {}", table); committerService.shutdown(); + boolean timeout = false; + int waitTime; try { // All rewrites have completed and all new files have been created, we are now waiting for // the commit -// pool to finish doing it's commits to Iceberg State. In the case of partial progress this +// pool to finish doing its commits to Iceberg State. In the case of partial progress this // should // have been occurring simultaneously with rewrites, if not there should be only a single // commit operation. -// In either case this should take much less than 10 minutes to actually complete. -if (!committerService.awaitTermination(10, TimeUnit.MINUTES)) { +// We will wait 10 minutes plus 5 more minutes for each commit left to perform due to the +// time required for writing manifests +waitTime = 10 + (completedRewrites.size() / rewritesPerCommit) * 5; +if (!committerService.awaitTermination(waitTime, TimeUnit.MINUTES)) { LOG.warn( - "Commit operation did not complete within 10 minutes of the files being written. This may mean " - + "that changes were not successfully committed to the the Iceberg table."); + "Commit operation did not complete within {} (10 + 5 * commitsRemaining) minutes of the all files " + + "being rewritten. This may mean that changes were not successfully committed to the the " + + "Iceberg catalog.", + waitTime); + timeout = true; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException( "Cannot complete commit for rewrite, commit service interrupted", e); } + Preconditions.checkArgument( Review Comment: This is important since we had a lot of users getting error from the precondition underneath here which is incorrect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6348: Python: Update license-checker
Fokko commented on code in PR #6348: URL: https://github.com/apache/iceberg/pull/6348#discussion_r1042617131 ## python/dev/.rat-excludes: ## @@ -0,0 +1,2 @@ +.rat-excludes Review Comment: I'd rather keep the two projects isolated so we have the possibility to split pyiceberg out in a separate repository. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] gaborkaszab commented on issue #6368: Delete/Update fails for tables with more than 1000 columns
gaborkaszab commented on issue #6368: URL: https://github.com/apache/iceberg/issues/6368#issuecomment-1341513400 @TuroczyX The agreement here is that there is no need to make this configurable and hardcoding to 10k is enough. See PR: https://github.com/apache/iceberg/pull/6369 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042622671 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ## @@ -42,4 +42,9 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls whether to preserve the existing grouping of data while planning splits + public static final String PRESERVE_DATA_GROUPING = Review Comment: I am afraid this one will be pretty public. Users will have to explicitly enable this as we don't know if Spark can benefit from the reported distribution and skip shuffles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042622671 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ## @@ -42,4 +42,9 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls whether to preserve the existing grouping of data while planning splits + public static final String PRESERVE_DATA_GROUPING = Review Comment: I am afraid this one will be pretty public. Users will have to explicitly enable this as we don't know if Spark can benefit from the reported distribution and skip shuffles. Hence, we disable it by default to avoid any performance regressions caused by less dense packing of splits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042627234 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java: ## @@ -255,74 +256,90 @@ public static org.apache.iceberg.Table toIcebergTable(Table table) { return sparkTable.table(); } + public static Transform[] toTransforms(Schema schema, List fields) { +SpecTransformToSparkTransform visitor = new SpecTransformToSparkTransform(schema); + +List transforms = Lists.newArrayList(); + +for (PartitionField field : fields) { + Transform transform = PartitionSpecVisitor.visit(schema, field, visitor); + if (transform != null) { +transforms.add(transform); + } +} + +return transforms.toArray(new Transform[0]); + } + /** * Converts a PartitionSpec to Spark transforms. * * @param spec a PartitionSpec * @return an array of Transforms */ public static Transform[] toTransforms(PartitionSpec spec) { -Map quotedNameById = SparkSchemaUtil.indexQuotedNameById(spec.schema()); -List transforms = -PartitionSpecVisitor.visit( -spec, -new PartitionSpecVisitor() { - @Override - public Transform identity(String sourceName, int sourceId) { -return Expressions.identity(quotedName(sourceId)); - } +SpecTransformToSparkTransform visitor = new SpecTransformToSparkTransform(spec.schema()); +List transforms = PartitionSpecVisitor.visit(spec, visitor); +return transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new); + } - @Override - public Transform bucket(String sourceName, int sourceId, int numBuckets) { -return Expressions.bucket(numBuckets, quotedName(sourceId)); - } + private static class SpecTransformToSparkTransform implements PartitionSpecVisitor { +private final Map quotedNameById; - @Override - public Transform truncate(String sourceName, int sourceId, int width) { -return Expressions.apply( -"truncate", -Expressions.column(quotedName(sourceId)), -Expressions.literal(width)); - } +SpecTransformToSparkTransform(Schema schema) { + this.quotedNameById = SparkSchemaUtil.indexQuotedNameById(schema); +} - @Override - public Transform year(String sourceName, int sourceId) { -return Expressions.years(quotedName(sourceId)); - } +@Override +public Transform identity(String sourceName, int sourceId) { + return Expressions.identity(quotedName(sourceId)); +} - @Override - public Transform month(String sourceName, int sourceId) { -return Expressions.months(quotedName(sourceId)); - } +@Override +public Transform bucket(String sourceName, int sourceId, int numBuckets) { + return Expressions.bucket(numBuckets, quotedName(sourceId)); +} - @Override - public Transform day(String sourceName, int sourceId) { -return Expressions.days(quotedName(sourceId)); - } +@Override +public Transform truncate(String sourceName, int sourceId, int width) { + NamedReference column = Expressions.column(quotedName(sourceId)); + return Expressions.apply("truncate", Expressions.literal(width), column); Review Comment: I think the only place that would change is string output of partitioning in `SparkTable`. Otherwise, we handle both combinations in the `TruncateTransform` extractor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] shardulm94 commented on a diff in pull request #6327: ORC: Fix error when projecting nested indentity partition column
shardulm94 commented on code in PR #6327: URL: https://github.com/apache/iceberg/pull/6327#discussion_r1042628482 ## orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java: ## @@ -442,4 +445,23 @@ static TypeDescription applyNameMapping(TypeDescription orcSchema, NameMapping n public static Map idToOrcName(Schema schema) { return TypeUtil.visit(schema, new IdToOrcName()); } + + /** + * Returns a {@link Schema} which has constant fields and metadata fields removed from the + * provided schema. This utility can be used to create a "read schema" which can be passed to the + * ORC file reader and hence avoiding deserialization and memory costs associated with column + * values already available through Iceberg metadata. + * + * NOTE: This method, unlike {@link TypeUtil#selectNot(Schema, Set)}, preserves empty structs + * (caused due to a struct having all constant fields) so that Iceberg ORC readers can later add + * constant fields in these structs Review Comment: Kept only the first part of the comment now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042631790 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ## @@ -42,4 +42,9 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls whether to preserve the existing grouping of data while planning splits + public static final String PRESERVE_DATA_GROUPING = + "spark.sql.iceberg.split.preserve-data-grouping"; + public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; Review Comment: My worry is the performance regressions that this may cause. There may be substantially more splits if this config is on. In order to benefit from SPJ, joins must have equality conditions on partition columns. Spark will propagate join conditions in the future. Our long-term plan may be to check if v2 bucketing enabled and whether we have join conditions on partition columns to return true by default. Even that will be suboptimal cause we don't know if SPJ would work. ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ## @@ -42,4 +42,9 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls whether to preserve the existing grouping of data while planning splits + public static final String PRESERVE_DATA_GROUPING = + "spark.sql.iceberg.split.preserve-data-grouping"; + public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; Review Comment: My worry is performance regressions that this may cause. There may be substantially more splits if this config is on. In order to benefit from SPJ, joins must have equality conditions on partition columns. Spark will propagate join conditions in the future. Our long-term plan may be to check if v2 bucketing enabled and whether we have join conditions on partition columns to return true by default. Even that will be suboptimal cause we don't know if SPJ would work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042631790 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ## @@ -42,4 +42,9 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls whether to preserve the existing grouping of data while planning splits + public static final String PRESERVE_DATA_GROUPING = + "spark.sql.iceberg.split.preserve-data-grouping"; + public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; Review Comment: My worry is performance regressions that this may cause. There may be substantially more splits if this config is on. In order to benefit from SPJ, joins must have equality conditions on partition columns. Spark will propagate join conditions in the future. Our long-term plan may be to check if v2 bucketing enabled and whether we have join conditions on partition columns to return true by default. Even that will be suboptimal cause we don't know if SPJ would actually apply or we simply produced less dense splits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042632953 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseScanTaskGroup; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Scan; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.SupportsReportPartitioning; +import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning; +import org.apache.spark.sql.connector.read.partitioning.Partitioning; +import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning; + +abstract class SparkPartitioningAwareScan extends SparkScan +implements SupportsReportPartitioning { + + private final Scan> scan; + private final boolean preserveDataGrouping; + + private Set specs = null; // lazy cache of scanned specs + private List tasks = null; // lazy cache of uncombined tasks + private List> taskGroups = null; // lazy cache of task groups + private StructType groupingKeyType = null; // lazy cache of the grouping key type + private StructLikeSet groupingKeys = null; // lazy cache of grouping keys + + SparkPartitioningAwareScan( + SparkSession spark, + Table table, + Scan> scan, + SparkReadConf readConf, + Schema expectedSchema, + List filters) { + +super(spark, table, readConf, expectedSchema, filters); + +this.scan = scan; +this.preserveDataGrouping = readConf.preserveDataGrouping(); + +if (scan == null) { + this.specs = Collections.emptySet(); + this.tasks = Collections.emptyList(); + this.taskGroups = Collections.emptyList(); +} + } + + protected abstract Class taskJavaClass(); + + protected Scan> scan() { +return scan; + } + + @Override + public Partitioning outputPartitioning() { +Preconditions.checkState(taskGroups() != null, "Task groups must be planned"); + +if (groupingKeyType().fields().isEmpty()) { + return new UnknownPartitioning(taskGroups().size()); +} else { + return new KeyGroupedPartitioning(groupingKeyTransforms(), taskGroups().size()); Review Comment: I'll add logs for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042635649 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( Review Comment: Chao is correct. I disable AQE (can cause one partition) and broadcasts (have higher priority than SPJ) to make sure SPJ kicks in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042643130 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + private static final Map SPJ_OFF_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + @BeforeClass + public static void setupSparkConf() { +spark.conf().set("spark.sql.shuffle.partitions", "4"); + } + + @After + public void removeTables() { +sql("DROP TABLE IF EXISTS %s", tableName); +sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME)); + } + + // TODO: add tests for truncate transforms once SPARK-40295 is released + // TODO: add tests for cases when one side contains a subset of keys once Spark supports this + Review Comment: We will definitely need to support distributing the other side using the Iceberg function catalog. A common use case for this is MERGE. The incoming relation may be a view, which will never be distributed in a compatible way with Iceberg tables. I hope Spark would be smart enough to shuffle just the smaller relation using the reported partitioning from the target table. Unfortunately, it does not work even if I distribute the incoming data manually using the Iceberg function catalog. We must have `KeyGroupPartitioning` on both sides, which can only be reported by data sources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@i
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042644217 ## core/src/main/java/org/apache/iceberg/Partitioning.java: ## @@ -215,11 +225,12 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) { * that have the same field ID but use a void transform under the hood. Such fields cannot be part * of the grouping key as void transforms always return null. * + * @param schema a schema Review Comment: Will add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042644545 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + private static final Map SPJ_OFF_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + @BeforeClass + public static void setupSparkConf() { +spark.conf().set("spark.sql.shuffle.partitions", "4"); + } + + @After + public void removeTables() { +sql("DROP TABLE IF EXISTS %s", tableName); +sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME)); + } + + // TODO: add tests for truncate transforms once SPARK-40295 is released + // TODO: add tests for cases when one side contains a subset of keys once Spark supports this + Review Comment: At the moment, though, we can only support Iceberg to Iceberg joins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6371: Spark 3.3: Support storage-partitioned joins
aokolnychyi commented on code in PR #6371: URL: https://github.com/apache/iceberg/pull/6371#discussion_r1042644906 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java: ## @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog { + + private static final String OTHER_TABLE_NAME = "other_table"; + + // open file cost and split size are set as 16 MB to produce a split per file + private static final String TABLE_PROPERTIES = + String.format( + "'%s' = 16777216, '%s' = 16777216", + TableProperties.SPLIT_SIZE, TableProperties.SPLIT_OPEN_FILE_COST); + + private static final Map SPJ_ON_SQL_CONF = + ImmutableMap.of( Review Comment: I'll add this to the notes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org