[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.
kingeasternsun commented on code in PR #6624: URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083420820 ## spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java: ## @@ -93,10 +94,20 @@ public InternalRow[] call(InternalRow args) { }); } +int parallelism; +if (!args.isNullAt(4)) { + parallelism = args.getInt(4); +} else { + parallelism = 1; +} Review Comment: > Nit: I think it's a bit cleaner to use ternary here > > `int parallelism = args.getInt(4) != null ? args.getInt(4) : 1` Thanks, I have fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.
kingeasternsun commented on code in PR #6624: URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083420860 ## spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java: ## @@ -99,7 +100,13 @@ public InternalRow[] call(InternalRow args) { if (dropBackup) { result = migrateTableSparkAction.dropBackup().execute(); } else { - result = migrateTableSparkAction.execute(); + int parallelism; + if (!args.isNullAt(3)) { +parallelism = args.getInt(3); + } else { +parallelism = 1; + } Review Comment: > Same nit as below i think ternary assignment works well here Thanks, I have fixed @amogh-jahagirdar . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko merged pull request #6641: Build: Bump rich from 13.1.0 to 13.2.0 in /python
Fokko merged PR #6641: URL: https://github.com/apache/iceberg/pull/6641 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on pull request #6640: Build: Bump adlfs from 2022.11.2 to 2023.1.0 in /python
Fokko commented on PR #6640: URL: https://github.com/apache/iceberg/pull/6640#issuecomment-1399445639 Superseded by https://github.com/apache/iceberg/pull/6643 which bumps all the fsspec packages in sync -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko closed pull request #6640: Build: Bump adlfs from 2022.11.2 to 2023.1.0 in /python
Fokko closed pull request #6640: Build: Bump adlfs from 2022.11.2 to 2023.1.0 in /python URL: https://github.com/apache/iceberg/pull/6640 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dependabot[bot] commented on pull request #6640: Build: Bump adlfs from 2022.11.2 to 2023.1.0 in /python
dependabot[bot] commented on PR #6640: URL: https://github.com/apache/iceberg/pull/6640#issuecomment-1399445664 OK, I won't notify you again about this release, but will get in touch when a new version is available. If you'd rather skip all updates until the next major or minor version, let me know by commenting `@dependabot ignore this major version` or `@dependabot ignore this minor version`. You can also ignore all major, minor, or patch releases for a dependency by adding an [`ignore` condition](https://docs.github.com/en/code-security/supply-chain-security/configuration-options-for-dependency-updates#ignore) with the desired `update_types` to your config file. If you change your mind, just re-open this PR and I'll resolve any conflicts on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko merged pull request #6639: Build: Bump pandas from 1.5.2 to 1.5.3 in /python
Fokko merged PR #6639: URL: https://github.com/apache/iceberg/pull/6639 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.
kingeasternsun commented on code in PR #6624: URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083428466 ## api/src/main/java/org/apache/iceberg/actions/MigrateTable.java: ## @@ -50,6 +50,15 @@ default MigrateTable dropBackup() { throw new UnsupportedOperationException("Dropping a backup is not supported"); } + /** + * @param numReaders the number of concurrent file read operations to use per partition + * @return this for method chaining + */ + default MigrateTable withParallelReads(int numReaders) { +throw new UnsupportedOperationException( Review Comment: > I think we can default to 1? /LGTM, But I'm not familiar with JAVA, Could you give more detail about how set this default to 1 , many thanks. @jackye1995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.
kingeasternsun commented on code in PR #6624: URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083440870 ## spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java: ## @@ -39,7 +39,8 @@ class MigrateTableProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional("properties", STRING_MAP), -ProcedureParameter.optional("drop_backup", DataTypes.BooleanType) +ProcedureParameter.optional("drop_backup", DataTypes.BooleanType), +ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType) Review Comment: @amogh-jahagirdar Another reason is that `importSparkPartitions` https://github.com/kingeasternsun/iceberg/blob/feature/add-parallelism-add-files/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java#L620-L630 has a local variable named `parallelism` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.
kingeasternsun commented on code in PR #6624: URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083441291 ## api/src/main/java/org/apache/iceberg/actions/MigrateTable.java: ## @@ -50,6 +50,15 @@ default MigrateTable dropBackup() { throw new UnsupportedOperationException("Dropping a backup is not supported"); } + /** + * @param numReaders the number of concurrent file read operations to use per partition + * @return this for method chaining + */ + default MigrateTable withParallelReads(int numReaders) { Review Comment: Thanks for your advice,It looks good to me, but in function `importSparkPartitions` https://github.com/kingeasternsun/iceberg/blob/feature/add-parallelism-add-files/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java#L620-L630 already has a local variable named `parallelism` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] youngxinler commented on a diff in pull request #6571: Docs: java api doc add write data example
youngxinler commented on code in PR #6571: URL: https://github.com/apache/iceberg/pull/6571#discussion_r1083463683 ## docs/java-api.md: ## @@ -147,6 +147,53 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java api can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 pieces of data to the table. + +```java +GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()); + +int partitionId = 1, taskId = 1; +OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET).build(); +final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema()); Review Comment: sorry for reply later. that's a good idea, I also don't think it's good to let users directly touch InternalRecordWrapper and PartitionedFanoutWriter, for TestTaskWriter, I read its writing logic. so add a new TaskWriter interface implementation class, name is GenericTaskWriter. It supports writing of non-partitioned and partitioned tables, What do you think of this, I have committed to this branch. @jackye1995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] youngxinler commented on a diff in pull request #6571: Docs: java api doc add write data example
youngxinler commented on code in PR #6571: URL: https://github.com/apache/iceberg/pull/6571#discussion_r1083463816 ## docs/java-api.md: ## @@ -147,6 +147,53 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java api can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 pieces of data to the table. + +```java +GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()); + +int partitionId = 1, taskId = 1; +OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET).build(); +final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema()); +final InternalRecordWrapper recordWrapper = new InternalRecordWrapper(table.schema().asStruct()); + +// partitionedFanoutWriter will auto partitioned record and create the partitioned writer +PartitionedFanoutWriter partitionedFanoutWriter = new PartitionedFanoutWriter(table.spec(), FileFormat.PARQUET, appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) { Review Comment: > I think we might need example for both partitioned and not partitioned done with GenericTaskWriter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] youngxinler commented on a diff in pull request #6571: Docs: java api doc add write data example
youngxinler commented on code in PR #6571: URL: https://github.com/apache/iceberg/pull/6571#discussion_r1083463759 ## docs/java-api.md: ## @@ -147,6 +147,53 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java api can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 pieces of data to the table. + +```java +GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()); + +int partitionId = 1, taskId = 1; +OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET).build(); +final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema()); +final InternalRecordWrapper recordWrapper = new InternalRecordWrapper(table.schema().asStruct()); + +// partitionedFanoutWriter will auto partitioned record and create the partitioned writer +PartitionedFanoutWriter partitionedFanoutWriter = new PartitionedFanoutWriter(table.spec(), FileFormat.PARQUET, appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) { Review Comment: done with GenericTaskWriter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] youngxinler commented on a diff in pull request #6571: Docs: java api doc add write data example
youngxinler commented on code in PR #6571: URL: https://github.com/apache/iceberg/pull/6571#discussion_r1083463953 ## docs/java-api.md: ## @@ -147,6 +147,53 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java api can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 pieces of data to the table. + +```java +GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()); + +int partitionId = 1, taskId = 1; +OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET).build(); +final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema()); +final InternalRecordWrapper recordWrapper = new InternalRecordWrapper(table.schema().asStruct()); + +// partitionedFanoutWriter will auto partitioned record and create the partitioned writer +PartitionedFanoutWriter partitionedFanoutWriter = new PartitionedFanoutWriter(table.spec(), FileFormat.PARQUET, appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) { +@Override +protected PartitionKey partition(Record record) { +partitionKey.partition(recordWrapper.wrap(record)); +return partitionKey; +} +}; + +GenericRecord genericRecord = GenericRecord.create(table.schema()); + +// assume write 1000 records +for (int i = 0; i < 1000; i++) { +GenericRecord record = genericRecord.copy(); +record.setField("level", i % 6 == 0 ? "error" : "info"); Review Comment: > we might want to specify the schema of the records you insert before the code block so people can understand it more easily The table structure has been described in the code comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] youngxinler commented on a diff in pull request #6571: Data: java api add GenericTaskWriter and add write demo to Doc.
youngxinler commented on code in PR #6571: URL: https://github.com/apache/iceberg/pull/6571#discussion_r1083465798 ## data/src/main/java/org/apache/iceberg/data/GenericTaskWriter.java: ## @@ -0,0 +1,79 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.UnpartitionedWriter; + +public class GenericTaskWriter extends BaseTaskWriter { Review Comment: GenericTaskWriter is here, it use UnpartitionedWriter or PartitionedFanoutWriter to write data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] youngxinler commented on a diff in pull request #6571: Data: java api add GenericTaskWriter and add write demo to Doc.
youngxinler commented on code in PR #6571: URL: https://github.com/apache/iceberg/pull/6571#discussion_r1083466424 ## data/src/main/java/org/apache/iceberg/data/GenericTaskWriter.java: ## @@ -0,0 +1,79 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.UnpartitionedWriter; + +public class GenericTaskWriter extends BaseTaskWriter { +private final BaseTaskWriter taskWriter; + +public GenericTaskWriter(PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize) { +super(spec, format, appenderFactory, fileFactory, io, targetFileSize); +if (spec.isPartitioned()) { +final InternalRecordWrapper recordWrapper = new InternalRecordWrapper(spec.schema().asStruct()); Review Comment: for InternalRecordWrapper, i just use spec.schema().asStruct() as struct, because the wrapper just for partition field data. If using table.schema().asStruct(), this will add an additional parameter to the GenericTaskWriter. I tested it and it's OK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] youngxinler commented on pull request #6554: Parquet: Improve Test Coverage of RowGroupFilter Code with Nans #6518
youngxinler commented on PR #6554: URL: https://github.com/apache/iceberg/pull/6554#issuecomment-1399507127 @RussellSpitzer Can I trouble you if you have time to do a review? About improve Test Coverage of RowGroupFilter Code with Nans. I have made changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6590: Python: Add sql command to the CLI
Fokko commented on code in PR #6590: URL: https://github.com/apache/iceberg/pull/6590#discussion_r1083538914 ## python/pyiceberg/io/pyarrow.py: ## @@ -470,6 +472,59 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +def open_task( +task: FileScanTask, +fs: FileSystem, +table: Table, +row_filter: BooleanExpression, +projected_schema: Schema, +case_sensitive: bool, +) -> pa.Table: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) Review Comment: I think we can bind this once to the table before fanning out. We can just pass in the `bound_row_filter` since this one will be the same for each of the tables (will also be easier on the GIL). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rubenvdg opened a new pull request, #6644: Python: Add support for static table
rubenvdg opened a new pull request, #6644: URL: https://github.com/apache/iceberg/pull/6644 This PR proposes adding support for static tables (i.e., reading a table directly from a metadata file without using a catalog, see also https://github.com/apache/iceberg/issues/6430). Happy to hear if this makes sense. Regarding unit tests: We could run all the existing `Table` tests for `StaticTable` too, but that might be a bit artificial. You'd get something ugly like this: ```python @pytest.fixture def table(..) -> Table: ... @pytest.fixture def static_table(...) -> StaticTable: ... @pytest.mark.parametrize("the_table", ["table", "static_table"]) def test_schema(the_table: Table, request: FixtureRequest) -> None: assert request.getfixturevalue(the_table).schema() == Schema(...) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new pull request, #6645: Python: Parallelize IO
Fokko opened a new pull request, #6645: URL: https://github.com/apache/iceberg/pull/6645 Alternative for the multithreading part of: https://github.com/apache/iceberg/pull/6590 This uses the ThreadPool approach instead of ThreadPoolExecutor. The ThreadPoolExecutor is more flexible and works well with heterogeneous tasks. This allows the user to handle exceptions per task and able to cancel individual tasks. But the ThreadPoolExecutor also has some limitations such as not being able to forcefully terminate all the tasks. For reading tasks I think the ThreadPool might be more appropriate, but for writing the ThreadPoolExecutor might be more applicable. A very nice writeup of the differences is available in this blog: https://superfastpython.com/threadpool-vs-threadpoolexecutor/ Before: ``` ➜ python git:(fd-threadpool) time python3 /tmp/test.py python3 /tmp/test.py 3.45s user 2.84s system 2% cpu 3:34.19 total ``` After: ``` ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 3.13s user 2.83s system 19% cpu 31.369 total ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 2.94s user 3.08s system 18% cpu 32.538 total ➜ python git:(fd-threadpool) ✗ time python3 /tmp/test.py python3 /tmp/test.py 2.84s user 3.14s system 20% cpu 29.033 total ``` Longlining the requests from EU to the USA might impact the results a bit but makes IO more dominant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6644: Python: Add support for static table
Fokko commented on code in PR #6644: URL: https://github.com/apache/iceberg/pull/6644#discussion_r1083543881 ## python/pyiceberg/table/__init__.py: ## @@ -167,6 +169,32 @@ def __eq__(self, other: Any) -> bool: ) +class StaticTable(Table): +"""Load a table directly from a metadata file (i.e., without using a catalog).""" + +def refresh(self) -> Table: +"""StaticTable metadata cannot be refreshed.""" +raise StaticTableImmutableError("StaticTable metadata cannot be refreshed.") + +@classmethod +def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> StaticTable: + +metadata = cls._load_metadata(metadata_location, properties) + +return cls( +identifier=("static-table", metadata_location), +metadata_location=metadata_location, +metadata=metadata, +io=load_file_io({**properties, **metadata.properties}), +) + +@staticmethod +def _load_metadata(metadata_location: str, properties: Properties) -> TableMetadata: Review Comment: How do you feel about merging this logic into `from_metadata`? I t looks like we don't reuse this anywhere. ## python/pyiceberg/exceptions.py: ## @@ -90,3 +90,7 @@ class SignError(Exception): class ResolveError(Exception): pass + + +class StaticTableImmutableError(Exception): Review Comment: How do you feel like re-using the internal `NotImplementedError`? ## python/pyiceberg/catalog/rest.py: ## @@ -175,11 +175,7 @@ class RestCatalog(Catalog): session: Session properties: Properties -def __init__( -self, -name: str, -**properties: str, -): +def __init__(self, name: str, **properties: str): Review Comment: Why did this change? I always assumed that black was deterministic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6581: Spark 3.3: Add RemoveDanglingDeletes action
amogh-jahagirdar commented on code in PR #6581: URL: https://github.com/apache/iceberg/pull/6581#discussion_r1083561493 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.MetadataTableType.ENTRIES; +import static org.apache.spark.sql.functions.min; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; +import org.apache.iceberg.actions.RemoveDanglingDeleteFilesActionResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An action that removes dangling delete files from the current snapshot. A delete file is dangling + * if its deletes no longer applies to any data file. + * + * The following dangling delete files are removed: + * + * + * Position delete files with a sequence number less than that of any data file in the same + * partition + * Equality delete files with a sequence number less than or equal to that of any data file in + * the same partition + * + */ +public class RemoveDanglingDeletesSparkAction +extends BaseSnapshotUpdateSparkAction +implements RemoveDanglingDeleteFiles { + + private final Table table; + + protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) { +super(spark); +this.table = table; + } + + @Override + protected RemoveDanglingDeletesSparkAction self() { +return this; + } + + @Override + public Result execute() { +if (table.specs().size() == 1 && table.spec().isUnpartitioned()) { + // ManifestFilterManager already performs this table-wide on each commit + return RemoveDanglingDeleteFilesActionResult.empty(); +} + +String desc = String.format("Remove dangling delete files for %s", table.name()); +JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); +return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { +Dataset entries = +loadMetadataTable(table, ENTRIES) +.filter("status < 2") // live entries +.selectExpr( +"data_file.partition as partition", +"data_file.spec_id as spec_id", +"data_file.file_path as file_path", +"data_file.content as content", +"data_file.file_size_in_bytes as file_size_in_bytes", Review Comment: Sorry if it's a naive question, do we need to project file_size_in_bytes for this action? ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *
[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6581: Spark 3.3: Add RemoveDanglingDeletes action
amogh-jahagirdar commented on code in PR #6581: URL: https://github.com/apache/iceberg/pull/6581#discussion_r1083566960 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.MetadataTableType.ENTRIES; +import static org.apache.spark.sql.functions.min; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; +import org.apache.iceberg.actions.RemoveDanglingDeleteFilesActionResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An action that removes dangling delete files from the current snapshot. A delete file is dangling + * if its deletes no longer applies to any data file. + * + * The following dangling delete files are removed: + * + * + * Position delete files with a sequence number less than that of any data file in the same + * partition + * Equality delete files with a sequence number less than or equal to that of any data file in + * the same partition + * + */ +public class RemoveDanglingDeletesSparkAction +extends BaseSnapshotUpdateSparkAction +implements RemoveDanglingDeleteFiles { + + private final Table table; + + protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) { +super(spark); +this.table = table; + } + + @Override + protected RemoveDanglingDeletesSparkAction self() { +return this; + } + + @Override + public Result execute() { +if (table.specs().size() == 1 && table.spec().isUnpartitioned()) { + // ManifestFilterManager already performs this table-wide on each commit + return RemoveDanglingDeleteFilesActionResult.empty(); +} + +String desc = String.format("Remove dangling delete files for %s", table.name()); +JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); +return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { +Dataset entries = +loadMetadataTable(table, ENTRIES) +.filter("status < 2") // live entries +.selectExpr( +"data_file.partition as partition", +"data_file.spec_id as spec_id", +"data_file.file_path as file_path", +"data_file.content as content", +"data_file.file_size_in_bytes as file_size_in_bytes", Review Comment: Nvm, we need to serialize the result to a delete file so it's needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6581: Spark 3.3: Add RemoveDanglingDeletes action
amogh-jahagirdar commented on PR #6581: URL: https://github.com/apache/iceberg/pull/6581#issuecomment-1399644783 @szehon-ho Thanks for the detailed analysis. On the surface it does make sense to combine with existing compaction mechanisms like RewriteDataFiles if the metadata only RemoveDanglingDeleteFiles is cheap and especially if many users are already running RewriteDataFiles periodically anyways. Is there a case we're missing where users would just want to run remove dangling delete files separately? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083570497 ## spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4: ## @@ -168,34 +175,76 @@ fieldList ; nonReserved -: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE -| DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET +: ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | FIELD | FIRST | HOURS | LAST | NULLS | OF | ORDERED | PARTITION | TABLE | WRITE +| DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS | TRUE | FALSE | MAP ; +snapshotId +: number +; + +numSnapshots +: number +; + +snapshotRetain +: number +; + +snapshotRefRetain Review Comment: Why are there so many aliases for number? Are these rules useful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083571839 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") Review Comment: Why is this testing with multiple catalogs? This is a table-level operation that shouldn't be affected by the catalog, so it should test just one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083572053 ## spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala: ## @@ -61,6 +62,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform, name) => AddPartitionFieldExec(catalog, ident, transform, name) :: Nil +case CreateBranch(IcebergCatalogAndIdentifier(catalog, ident), _, _, _, _, _) => + CreateBranchExec(catalog, ident, plan.asInstanceOf[CreateBranch]) :: Nil Review Comment: Why does this pass the logical plan rather than passing the necessary information? Is it just to avoid a longer line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #4581: Spark: Cannot read or write UUID columns
github-actions[bot] commented on issue #4581: URL: https://github.com/apache/iceberg/issues/4581#issuecomment-1399650804 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083572437 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { +return new Object[][] { + { +SparkCatalogConfig.SPARK.catalogName(), +SparkCatalogConfig.SPARK.implementation(), +SparkCatalogConfig.SPARK.properties() + } +}; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); + } + + @After + public void removeTable() { +sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +long snapshotId = table.currentSnapshot().snapshotId(); +String branchName = "b1"; +Integer minSnapshotsToKeep = 2; +long maxSnapshotAge = 2L; +long maxRefAge = 10L; +sql( +"ALTER TABLE %s CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days", +tableName, branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + +AssertHelpers.assertThrows( +"Cannot create an existing branch", +IllegalArgumentException.class, +"already exists", +() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); + } + + @Test + public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); Review Comment: This should assert the state of the branch. I think it would use the current snapshot of main, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083572534 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { +return new Object[][] { + { +SparkCatalogConfig.SPARK.catalogName(), +SparkCatalogConfig.SPARK.implementation(), +SparkCatalogConfig.SPARK.properties() + } +}; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); + } + + @After + public void removeTable() { +sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +long snapshotId = table.currentSnapshot().snapshotId(); Review Comment: This should use a snapshot other than the default to test the clause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083572578 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { +return new Object[][] { + { +SparkCatalogConfig.SPARK.catalogName(), +SparkCatalogConfig.SPARK.implementation(), +SparkCatalogConfig.SPARK.properties() + } +}; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); + } + + @After + public void removeTable() { +sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +long snapshotId = table.currentSnapshot().snapshotId(); +String branchName = "b1"; +Integer minSnapshotsToKeep = 2; +long maxSnapshotAge = 2L; +long maxRefAge = 10L; +sql( +"ALTER TABLE %s CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days", +tableName, branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); Review Comment: This needs an assertion about the snapshot referenced by the branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083572778 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { +return new Object[][] { + { +SparkCatalogConfig.SPARK.catalogName(), +SparkCatalogConfig.SPARK.implementation(), +SparkCatalogConfig.SPARK.properties() + } +}; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); + } + + @After + public void removeTable() { +sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +long snapshotId = table.currentSnapshot().snapshotId(); +String branchName = "b1"; +Integer minSnapshotsToKeep = 2; +long maxSnapshotAge = 2L; +long maxRefAge = 10L; +sql( +"ALTER TABLE %s CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days", +tableName, branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + +AssertHelpers.assertThrows( +"Cannot create an existing branch", +IllegalArgumentException.class, +"already exists", +() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); + } + + @Test + public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertNull(ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableException { +Integer minSnapshotsToKeep = 2; +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql( +"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS", +tableName, branchName, minSnapshotsToKeep); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableException { +long maxSnapshotAge = 2L; +Table table = create
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083572778 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { +return new Object[][] { + { +SparkCatalogConfig.SPARK.catalogName(), +SparkCatalogConfig.SPARK.implementation(), +SparkCatalogConfig.SPARK.properties() + } +}; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); + } + + @After + public void removeTable() { +sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +long snapshotId = table.currentSnapshot().snapshotId(); +String branchName = "b1"; +Integer minSnapshotsToKeep = 2; +long maxSnapshotAge = 2L; +long maxRefAge = 10L; +sql( +"ALTER TABLE %s CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days", +tableName, branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + +AssertHelpers.assertThrows( +"Cannot create an existing branch", +IllegalArgumentException.class, +"already exists", +() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); + } + + @Test + public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertNull(ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableException { +Integer minSnapshotsToKeep = 2; +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql( +"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS", +tableName, branchName, minSnapshotsToKeep); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableException { +long maxSnapshotAge = 2L; +Table table = create
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083573324 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { +return new Object[][] { + { +SparkCatalogConfig.SPARK.catalogName(), +SparkCatalogConfig.SPARK.implementation(), +SparkCatalogConfig.SPARK.properties() + } +}; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); + } + + @After + public void removeTable() { +sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +long snapshotId = table.currentSnapshot().snapshotId(); +String branchName = "b1"; +Integer minSnapshotsToKeep = 2; +long maxSnapshotAge = 2L; +long maxRefAge = 10L; +sql( +"ALTER TABLE %s CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days", +tableName, branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + +AssertHelpers.assertThrows( +"Cannot create an existing branch", +IllegalArgumentException.class, +"already exists", +() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); + } + + @Test + public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertNull(ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableException { +Integer minSnapshotsToKeep = 2; +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql( +"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS", +tableName, branchName, minSnapshotsToKeep); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableException { +long maxSnapshotAge = 2L; +Table table = create
[GitHub] [iceberg] rdblue commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
rdblue commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083573422 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { +return new Object[][] { + { +SparkCatalogConfig.SPARK.catalogName(), +SparkCatalogConfig.SPARK.implementation(), +SparkCatalogConfig.SPARK.properties() + } +}; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); + } + + @After + public void removeTable() { +sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +long snapshotId = table.currentSnapshot().snapshotId(); +String branchName = "b1"; +Integer minSnapshotsToKeep = 2; +long maxSnapshotAge = 2L; +long maxRefAge = 10L; +sql( +"ALTER TABLE %s CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days", +tableName, branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); +Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + +AssertHelpers.assertThrows( +"Cannot create an existing branch", +IllegalArgumentException.class, +"already exists", +() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); + } + + @Test + public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertNull(ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableException { +Integer minSnapshotsToKeep = 2; +Table table = createDefaultTableAndInsert2Row(); +String branchName = "b1"; +sql( +"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS", +tableName, branchName, minSnapshotsToKeep); +table.refresh(); +SnapshotRef ref = table.refs().get(branchName); +Assert.assertNotNull(ref); +Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); +Assert.assertNull(ref.maxSnapshotAgeMs()); +Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableException { +long maxSnapshotAge = 2L; +Table table = create
[GitHub] [iceberg] rdblue commented on a diff in pull request #6474: Make it explicit that metrics reporter is required
rdblue commented on code in PR #6474: URL: https://github.com/apache/iceberg/pull/6474#discussion_r1083574777 ## core/src/main/java/org/apache/iceberg/BaseTable.java: ## @@ -48,6 +49,7 @@ public BaseTable(TableOperations ops, String name) { } public BaseTable(TableOperations ops, String name, MetricsReporter reporter) { +Preconditions.checkNotNull(reporter, "reporter cannot be null"); Review Comment: +1 for @nastra's comment about the error message. Whether we use `checkNotNull` or `checkArgument` doesn't really matter because both are `ArgumentException`. The problem with this is that it makes an unnecessary change to have an error message that doesn't fit with the conventions used elsewhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on issue #6625: Improve nullability check in Iceberg codebase
rdblue commented on issue #6625: URL: https://github.com/apache/iceberg/issues/6625#issuecomment-1399656418 The reason why we don't use `checkNonNull` is that it throws `NullPointerException`, which mostly makes people think that something tried to dereference it. We use `checkArgument` so the reader gets the message "you passed the wrong thing" rather than "there's a null dereferenced somewhere in Iceberg". This distinction is minor since NPE is a subclass of `ArgumentException`. As I mentioned on #6474, the main problem with this was that we have a convention for error messages and this needlessly changed the error message to be inconsistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6614: Flink:fix flink streaming query problem [ Cannot get a client from a closed pool]
rdblue commented on code in PR #6614: URL: https://github.com/apache/iceberg/pull/6614#discussion_r1083575712 ## core/src/main/java/org/apache/iceberg/ClientPoolImpl.java: ## @@ -56,7 +56,6 @@ public R run(Action action, boolean retry) throws E, InterruptedExc C client = get(); try { return action.run(client); - Review Comment: Please remove unnecessary whitespace changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6614: Flink:fix flink streaming query problem [ Cannot get a client from a closed pool]
rdblue commented on code in PR #6614: URL: https://github.com/apache/iceberg/pull/6614#discussion_r1083575818 ## core/src/main/java/org/apache/iceberg/ClientPoolImpl.java: ## @@ -147,4 +156,10 @@ private void release(C client) { public int poolSize() { return poolSize; } + + protected abstract C newClient(); + + protected abstract C reconnect(C client); + + protected abstract void close(C client); Review Comment: Please move these back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on pull request #6614: Flink:fix flink streaming query problem [ Cannot get a client from a closed pool]
rdblue commented on PR #6614: URL: https://github.com/apache/iceberg/pull/6614#issuecomment-1399658376 I think that if a catalog is closed, it's reasonable for tables to stop operating as well. The catalog manages its shared resources and if it chooses to share a connection pool with tables then it makes sense for the tables to no longer be able to connect after the pool is closed. Tables should not own their own connection pools, so some resource needs to manage them and the catalog is a good place to do that. I think the problem is that the `TableLoader` is taking ownership of the catalog and closing it. That seems incorrect to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6634: Core, API: Fix for tracking intermediate snapshots when a transaction spans multiple branches
rdblue commented on code in PR #6634: URL: https://github.com/apache/iceberg/pull/6634#discussion_r1083582534 ## core/src/main/java/org/apache/iceberg/BaseTransaction.java: ## @@ -551,10 +555,19 @@ public void commit(TableMetadata underlyingBase, TableMetadata metadata) { } // track the intermediate snapshot ids for rewriting the snapshot log - // an id is intermediate if it isn't the base snapshot id and it is replaced by a new current - Long oldId = currentId(current); - if (oldId != null && !oldId.equals(currentId(metadata)) && !oldId.equals(currentId(base))) { -intermediateSnapshotIds.add(oldId); + // an id is intermediate if it isn't the head of the branch in base and it is replaced by a new head of the branch in current Review Comment: I don't think that we need to keep intermediate snapshot IDs like this anymore. I took a look at this and the intermediate IDs are currently used to ensure that we don't delete files from any committed snapshot. Before we added metadata change tracking to TableMetadata, this list was also used to rewrite the history / snapshot log. But that's handled in `TableMetadata.Builder` now, so this is only used for fixing up the deletes. Fixing up deletes is a much simpler problem. We don't need to know whether a reference was "intermediate" anymore -- that was for the history fixes. So what we need is a list of the new snapshots committed by the transaction to feed into `committedFiles`. We can do that more easily by taking the current set of snapshots and removing the old set of snapshots. @amogh-jahagirdar, does that make sense? We can do this all in `commitSimpleTransaction` and remove `intermediateSnapshotIds`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6637: Spark: Spark SQL Extensions for create tag
hililiwei commented on code in PR #6637: URL: https://github.com/apache/iceberg/pull/6637#discussion_r1083582974 ## spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4: ## @@ -168,34 +169,61 @@ fieldList ; nonReserved -: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE -| DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET -| TRUE | FALSE +: ADD | ALTER | AS | ASC | BY | CALL | CREATE | DAYS | DESC | DROP | FIELD | FIRST | HOURS | LAST | NULLS | OF | ORDERED | PARTITION | TABLE | WRITE +| DISTRIBUTED | LOCALLY | MINUTES | UNORDERED | REPLACE | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET +| TAG | TRUE | FALSE | MAP ; +snapshotId +: number +; + +snapshotRefRetain Review Comment: >To me, it feels more intuitive to just say RETAIN number timeUnit instead of RETAIN snapshotRefRetain snapshotRefRetainTimeUnit I originally added `snapshotRefRetain` and `snapshotRetain` to make the statement parsing code more readable. Removing it is technically feasible. In the new version, I have removed (including create branch). >Another thing I overlooked is that looks like the Antlr convention is to use all capital letters for these definitions, like TIME_UNIT instead of timeUnit. I don't know if they imply different functionalities though. All caps usually indicate keywords. So, I prefer keep `timeUnit`. ``` ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering ``` like `TABLE`: `TABLE: 'TABLE';` However, `writeSpec` is written as: ``` writeSpec : (writeDistributionSpec | writeOrderingSpec)* ; ``` `writeDistributionSpec`: ``` writeDistributionSpec : DISTRIBUTED BY PARTITION ; ``` Thanks. Liwei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #6582: Add a Spark procedure to collect NDV
ajantha-bhat commented on code in PR #6582: URL: https://github.com/apache/iceberg/pull/6582#discussion_r1083604944 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/DistinctCountProcedure.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A procedure that gets approximate NDV (number of distinct value) for the requested columns and + * sets this to the table's StatisticsFile. + */ +public class DistinctCountProcedure extends BaseProcedure { + private static final Logger LOG = LoggerFactory.getLogger(DistinctCountProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { +ProcedureParameter.required("table", DataTypes.StringType), +ProcedureParameter.optional("distinct_count_view", DataTypes.StringType), +ProcedureParameter.optional("columns", STRING_ARRAY), + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { +new StructField("view_name", DataTypes.StringType, false, Metadata.empty()) + }); + + public static SparkProcedures.ProcedureBuilder builder() { +return new Builder() { + @Override + protected DistinctCountProcedure doBuild() { +return new DistinctCountProcedure(tableCatalog()); + } +}; + } + + private DistinctCountProcedure(TableCatalog tableCatalog) { +super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { +return PARAMETERS; + } + + @Override + public StructType outputType() { +return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { +String tableName = args.getString(0); +Identifier tableIdent = toIdentifier(tableName, PARAMETERS[0].name()); +SparkTable sparkTable = loadSparkTable(tableIdent); +StructType schema = sparkTable.schema(); +Table table = sparkTable.table(); +ArrayData columns = args.getArray(2); +int columnSizes = columns.numElements(); + +long[] ndvs = new long[columnSizes]; +int[] fieldId = new int[columnSizes]; +String query = "SELECT "; +for (int i = 0; i < columnSizes; i++) { + String colName = columns.getUTF8String(i).toString(); + query += "APPROX_COUNT_DISTINCT(" + colName + "), "; + fieldId[i] = schema.fieldIndex(colName); +} + +query = query.substring(0, query.length() - 2) + " FROM " + tableName; +Dataset df = spark().sql(query); Review Comment: @RussellSpitzer, @flyrain, @huaxingao: Is it good to have a spark action first and call that action from
[GitHub] [iceberg] hililiwei commented on pull request #6614: Flink:fix flink streaming query problem [ Cannot get a client from a closed pool]
hililiwei commented on PR #6614: URL: https://github.com/apache/iceberg/pull/6614#issuecomment-1399718452 > I think the problem is that the `TableLoader` is taking ownership of the catalog and closing it. That seems incorrect to me. So from that point of view, cloning tables doesn't seem like a good idea, they cannot share resources. . We should keep the catalog available in the loader, or create a statically shared catalog. Alternatively, we could try to get rid of tableLoader altogether and switch to catalog, but this would be a compatibility breaking change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
hililiwei commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083616314 ## spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4: ## @@ -168,34 +175,76 @@ fieldList ; nonReserved -: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE -| DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET +: ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | FIELD | FIRST | HOURS | LAST | NULLS | OF | ORDERED | PARTITION | TABLE | WRITE +| DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS | TRUE | FALSE | MAP ; +snapshotId +: number +; + +numSnapshots +: number +; + +snapshotRetain +: number +; + +snapshotRefRetain Review Comment: @jackye1995 asked the same question. I originally added snapshotRefRetain and snapshotRetain to make the statement parsing code more readable. Removing it is technically feasible. In the new version, I have removed (including create branch). ref: https://github.com/apache/iceberg/pull/6637#discussion_r1083582974 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #6617: Spark: Spark SQL Extensions for create branch
hililiwei commented on code in PR #6617: URL: https://github.com/apache/iceberg/pull/6617#discussion_r1083616874 ## spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") Review Comment: yes, we just test `SparkCatalogConfig.SPARK` catalog. ``` @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") public static Object[][] parameters() { return new Object[][] { { SparkCatalogConfig.SPARK.catalogName(), SparkCatalogConfig.SPARK.implementation(), SparkCatalogConfig.SPARK.properties() } }; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] arminnajafi opened a new pull request, #6646: Implement Support for DynamoDB Catalog
arminnajafi opened a new pull request, #6646: URL: https://github.com/apache/iceberg/pull/6646 Fixes #6541 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6614: Flink:fix flink streaming query problem [ Cannot get a client from a closed pool]
stevenzwu commented on PR #6614: URL: https://github.com/apache/iceberg/pull/6614#issuecomment-1399807014 > I think the problem is that the TableLoader is taking ownership of the catalog and closing it. That seems incorrect to me. Yes, that will be bigger discussion. > Alternatively, we could try to get rid of tableLoader altogether and switch to catalog, but this would be a compatibility breaking change. We can deprecate the `TableLoader` and provide an alternative if we can come up with a reasonable solution. I am wondering if we can have a short-term solution for the immediate issue #6455. I think this Flink usage pattern of `TableLoader` is fine if the `Table` is used as a read-only table. ``` if (table == null) { try (TableLoader loader = tableLoader) { loader.open(); this.table = tableLoader.loadTable(); } catch (IOException e) { throw new UncheckedIOException(e); } } ``` The problem for `ContinuousSplitPlannerImpl` of FLIP-27 `IcebergSource` is that it uses the `Table` as non read-only, as it needs to refresh the table. With catalog closed, the table is invalid for refresh. One solution is to pass in a cloned `TableLoader` to the `ContinuousSplitPlannerImpl`, which can assumes the ownership of the `TableLoader`. `ContinuousSplitPlannerImpl` also has a `close` method and can call `TableLoader#close` method to release the resources. If we can add a `clone` method to `TableLoader` interface, this approach can work. @xuzhiwen1255 I definitely think we should close this PR as this is not the right way to fix the problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu merged pull request #6635: Flink: add table setter to FLIP-27 IcebergSource#Builder.
stevenzwu merged PR #6635: URL: https://github.com/apache/iceberg/pull/6635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6634: Core, API: Fix for tracking intermediate snapshots when a transaction spans multiple branches
amogh-jahagirdar commented on code in PR #6634: URL: https://github.com/apache/iceberg/pull/6634#discussion_r1083658719 ## core/src/main/java/org/apache/iceberg/BaseTransaction.java: ## @@ -551,10 +555,19 @@ public void commit(TableMetadata underlyingBase, TableMetadata metadata) { } // track the intermediate snapshot ids for rewriting the snapshot log - // an id is intermediate if it isn't the base snapshot id and it is replaced by a new current - Long oldId = currentId(current); - if (oldId != null && !oldId.equals(currentId(metadata)) && !oldId.equals(currentId(base))) { -intermediateSnapshotIds.add(oldId); + // an id is intermediate if it isn't the head of the branch in base and it is replaced by a new head of the branch in current Review Comment: Thanks @rdblue that makes a lot of sense, my previous logic was a more complex way of just capturing the new snapshots across all the branches but we can quite simply just get that by diffing the set of snapshots that were committed and the pre-transaction snapshots. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] XN137 commented on pull request #6629: Build: Fix minor error-prone warnings
XN137 commented on PR #6629: URL: https://github.com/apache/iceberg/pull/6629#issuecomment-1399881912 should we raise the severity of the `StringSplitter` check to `ERROR` like [here](https://github.com/apache/iceberg/blob/4491e7d6d6911ca0ecf129d7528dda2717f8c938/baseline.gradle#L70) ? this prevents similar problems from being introduced again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on pull request #6614: Flink:fix flink streaming query problem [ Cannot get a client from a closed pool]
pvary commented on PR #6614: URL: https://github.com/apache/iceberg/pull/6614#issuecomment-1399896611 > The problem for ContinuousSplitPlannerImpl of FLIP-27 IcebergSource is that it uses the Table as non read-only, as it needs to refresh the table. `Table` API assumes that you can have a refrence for the table "forever" and you can refresh the table data whenever you want. There should be no restrictions. This means that the table needs access to an open connection pool until the table is closed. I think that closing the JDBC pool before closing the table is the mistake here. I think it is not by chance that we do not have a close method on the general Catalog interface. As a general rule we expect the Catalog to be an easy static wrapper around the resources needed to access the table snapshot pointer. This could be strange and I had to push back several times on closing the HMS Connection pool in HiveCatalog when someone wanted to "fix" this issue. The JDBC and maybe some other Catalog implementations did not do this pushback, and we are in the situation where the different Catalog implementations behave differently. We should standardize the behavior (who is responsible for closing the connection pools). Hive has its own PoolCache to close unused pools, JDBC doesn't have this (if I understand correctly) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] ajantha-bhat commented on pull request #6629: Build: Fix minor error-prone warnings
ajantha-bhat commented on PR #6629: URL: https://github.com/apache/iceberg/pull/6629#issuecomment-1399900490 > should we raise the severity of the StringSplitter check to ERROR like [here](https://github.com/apache/iceberg/blob/4491e7d6d6911ca0ecf129d7528dda2717f8c938/baseline.gradle#L70) ? this prevents similar problems from being introduced again We could add this. But I am not sure on what basis we add here because there are 400 plus bug patterns in errorProne. Even if we prevent this error, there can be some other error-prone warnings that can still prevent the build from looking green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] XN137 commented on pull request #6629: Build: Fix minor error-prone warnings
XN137 commented on PR #6629: URL: https://github.com/apache/iceberg/pull/6629#issuecomment-1399910688 if these warnings are worth fixing, shouldnt they also be worth preventing? automatic prevention is usually cheaper than manually observing and fixing future warnings. this also has the added benefit that we are sure this PR addressed all these types of violations, not just the ones you happened to see. i dont see how the existence of other errorprone checks, that are not being enforced, matters in this discussion. ideally all of the checks we agree are worth fixing should be enforced... the problem is identifying those and addressing all their current violations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org