Re: [PR] Make AzureProperties w/ shared-key creds serializable [iceberg]
nastra commented on PR #10045: URL: https://github.com/apache/iceberg/pull/10045#issuecomment-2082016600 @snazy could you add a test please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump getdaft from 0.2.16 to 0.2.21 [iceberg-python]
Fokko merged PR #662: URL: https://github.com/apache/iceberg-python/pull/662 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Migrate FlinkTestBase related tests [iceberg]
nastra commented on code in PR #10232: URL: https://github.com/apache/iceberg/pull/10232#discussion_r1582655185 ## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java: ## @@ -37,23 +43,22 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.StructLikeSet; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; /** * In this test case, we mainly cover the impact of primary key selection, multiple operations * within a single transaction, and multiple operations between different txn on the correctness of * the data. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class TestChangeLogTable extends ChangeLogTableTestBase { + @TempDir private static Path temp; Review Comment: I don't think this should be static -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Migrate FlinkTestBase related tests [iceberg]
nastra commented on code in PR #10232: URL: https://github.com/apache/iceberg/pull/10232#discussion_r1582655786 ## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java: ## @@ -32,33 +37,37 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.thrift.TException; -import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; -@RunWith(Parameterized.class) -public class TestIcebergConnector extends FlinkTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public class TestIcebergConnector extends TestBase { private static final String TABLE_NAME = "test_table"; - @ClassRule public static final TemporaryFolder WAREHOUSE = new TemporaryFolder(); + @TempDir private static Path warehouse; Review Comment: shouldn't be static -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] HiveMetaHook implementation to enable CREATE TABLE and DROP TABLE from Hive queries [iceberg]
pvary commented on code in PR #1495: URL: https://github.com/apache/iceberg/pull/1495#discussion_r1582687790 ## mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Properties; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveIcebergMetaHook implements HiveMetaHook { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); + private static final Set PARAMETERS_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, Catalogs.LOCATION, Catalogs.NAME); + private static final Set PROPERTIES_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.EXTERNAL_TABLE_PURGE, hive_metastoreConstants.META_TABLE_STORAGE, "EXTERNAL", + "bucketing_version"); + + private final Configuration conf; + private Table icebergTable = null; + private Properties catalogProperties; + private boolean deleteIcebergTable; + private FileIO deleteIo; + private TableMetadata deleteMetadata; + + public HiveIcebergMetaHook(Configuration conf) { +this.conf = conf; + } + + @Override + public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { +this.catalogProperties = getCatalogProperties(hmsTable); + +// Set the table type even for non HiveCatalog based tables +hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, +BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()); + +if (!Catalogs.hiveCatalog(conf)) { + // If not using HiveCatalog check for existing table + try { +this.icebergTable = Catalogs.loadTable(conf, catalogProperties); + + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null, +"Iceberg table already created - can not use provided schema"); + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC) == null, +"Iceberg table already created - can not use provided partition specification"); + +LOG.info("Iceberg table already exists {}", icebergTable); + +return; + } catch (NoSuchTableException nte) { +// If the table does not exist we will create it below + } +} + +// If the table does not exist collect data for table creation +String schemaString = catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA); +Preconditions.checkNotNull(schemaString, "Please provide a table schema"); +// Just check if it is parsable, and later use for partition specification parsing +Schema schema = SchemaParser.fromJson(schemaString); + +String specString = catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC); +if (specString != null) { + // Just check if it is parsable + PartitionSpecParser.fromJson(schema, specString); +} + +// Allow purging table data if the table is created now and not set otherwise +if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE) == null) { + hmsTable.getParamet
Re: [PR] MR: iceberg storage handler should set common projection pruning config [iceberg]
ludlows commented on code in PR #10188: URL: https://github.com/apache/iceberg/pull/10188#discussion_r1582727175 ## mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -111,8 +111,15 @@ public void configureTableJobProperties(TableDesc tableDesc, Map // @Override public void configureInputJobCredentials(TableDesc tableDesc, Map secrets) {} + private void setCommonJobConf(JobConf jobConf) { Review Comment: Hi @pvary , thanks for your suggestions. now I have implemented one unit test. however, according to the test result, it seems that my modification doesn't take effects. could you give me a hint about the test failure ? many thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Retry connections in JDBC catalog with user configured error code list [iceberg]
nastra commented on code in PR #10140: URL: https://github.com/apache/iceberg/pull/10140#discussion_r1582743700 ## core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestClientPoolImpl { + + static class RetryableException extends RuntimeException {} + + static class NonRetryableException extends RuntimeException {} + + static class MockClient { +boolean closed = false; + +int actions = 0; + +int retryableFailures = 0; + +public void close() { + closed = true; +} + +public int successfulAction() { + actions++; + return actions; +} + +int succeedAfter(int succeedAfterAttempts) { + if (retryableFailures == succeedAfterAttempts - 1) { +return successfulAction(); + } + + retryableFailures++; + throw new RetryableException(); +} + +int failWithNonRetryable() { + throw new NonRetryableException(); +} + } + + static class MockClientPoolImpl extends ClientPoolImpl { + +private int reconnectionAttempts; + +MockClientPoolImpl( +int poolSize, +Class reconnectExc, +boolean retryByDefault, +int numRetries) { + super(poolSize, reconnectExc, retryByDefault, numRetries); +} + +@Override +protected MockClient newClient() { + return new MockClient(); +} + +@Override +protected MockClient reconnect(MockClient client) { + reconnectionAttempts++; + return client; +} + +@Override +protected void close(MockClient client) { + client.close(); +} + +int reconnectionAttempts() { + return reconnectionAttempts; +} + } + + @Test + public void testRetrySucceedsWithinMaxAttempts() throws Exception { +int maxRetries = 5; +int succeedAfterAttempts = 3; +try (MockClientPoolImpl mockClientPool = +new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) { + int actions = mockClientPool.run(client -> client.succeedAfter(succeedAfterAttempts)); + assertThat(actions) + .as("There should be exactly one successful action invocation") + .isEqualTo(1); + assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts - 1); +} + } + + @Test + public void testRetriesExhaustedAndSurfacesFailure() { +int maxRetries = 3; +int succeedAfterAttempts = 5; +MockClientPoolImpl mockClientPool = +new MockClientPoolImpl(2, RetryableException.class, true, maxRetries); +assertThatThrownBy( +() -> { Review Comment: nit: {} can be removed here and in the other tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Retry connections in JDBC catalog with user configured error code list [iceberg]
nastra commented on code in PR #10140: URL: https://github.com/apache/iceberg/pull/10140#discussion_r1582745688 ## core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestClientPoolImpl { + + static class RetryableException extends RuntimeException {} + + static class NonRetryableException extends RuntimeException {} + + static class MockClient { +boolean closed = false; + +int actions = 0; + +int retryableFailures = 0; + +public void close() { + closed = true; +} + +public int successfulAction() { + actions++; + return actions; +} + +int succeedAfter(int succeedAfterAttempts) { + if (retryableFailures == succeedAfterAttempts - 1) { +return successfulAction(); + } + + retryableFailures++; + throw new RetryableException(); +} + +int failWithNonRetryable() { + throw new NonRetryableException(); +} + } + + static class MockClientPoolImpl extends ClientPoolImpl { + +private int reconnectionAttempts; + +MockClientPoolImpl( +int poolSize, +Class reconnectExc, +boolean retryByDefault, +int numRetries) { + super(poolSize, reconnectExc, retryByDefault, numRetries); +} + +@Override +protected MockClient newClient() { + return new MockClient(); +} + +@Override +protected MockClient reconnect(MockClient client) { + reconnectionAttempts++; + return client; +} + +@Override +protected void close(MockClient client) { + client.close(); +} + +int reconnectionAttempts() { + return reconnectionAttempts; +} + } + + @Test + public void testRetrySucceedsWithinMaxAttempts() throws Exception { +int maxRetries = 5; +int succeedAfterAttempts = 3; +try (MockClientPoolImpl mockClientPool = +new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) { + int actions = mockClientPool.run(client -> client.succeedAfter(succeedAfterAttempts)); + assertThat(actions) + .as("There should be exactly one successful action invocation") + .isEqualTo(1); + assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts - 1); +} + } + + @Test + public void testRetriesExhaustedAndSurfacesFailure() { +int maxRetries = 3; +int succeedAfterAttempts = 5; +MockClientPoolImpl mockClientPool = Review Comment: should this (and other places) be in a try-with-resources block like in the other test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Retry connections in JDBC catalog with user configured error code list [iceberg]
nastra commented on code in PR #10140: URL: https://github.com/apache/iceberg/pull/10140#discussion_r1582748026 ## core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestClientPoolImpl { + + static class RetryableException extends RuntimeException {} + + static class NonRetryableException extends RuntimeException {} + + static class MockClient { +boolean closed = false; + +int actions = 0; + +int retryableFailures = 0; + +public void close() { + closed = true; +} + +public int successfulAction() { + actions++; + return actions; +} + +int succeedAfter(int succeedAfterAttempts) { + if (retryableFailures == succeedAfterAttempts - 1) { +return successfulAction(); + } + + retryableFailures++; + throw new RetryableException(); +} + +int failWithNonRetryable() { + throw new NonRetryableException(); +} + } + + static class MockClientPoolImpl extends ClientPoolImpl { + +private int reconnectionAttempts; + +MockClientPoolImpl( +int poolSize, +Class reconnectExc, +boolean retryByDefault, +int numRetries) { + super(poolSize, reconnectExc, retryByDefault, numRetries); +} + +@Override +protected MockClient newClient() { + return new MockClient(); +} + +@Override +protected MockClient reconnect(MockClient client) { + reconnectionAttempts++; + return client; +} + +@Override +protected void close(MockClient client) { + client.close(); +} + +int reconnectionAttempts() { + return reconnectionAttempts; +} + } + + @Test Review Comment: I would probably move the tests to the top and the inner classes to the bottom -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Use `pre-commit.ci` to automatically update pre-commit hook versions [iceberg-python]
Fokko merged PR #665: URL: https://github.com/apache/iceberg-python/pull/665 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Add ManifestFile Stats in snapshot summary. [iceberg]
nk1506 opened a new pull request, #10246: URL: https://github.com/apache/iceberg/pull/10246 Currently snapshot summary doesn't have statistics related to Manifest Files. This change is adding two new summary fields `"total-data-manifest-files"` and `"total-delete-manifest-files"`. There is a plan to help engines to do better planning estimation in terms of manifest reading. Old Snapshot Summary ``` { "added-data-files": "1", "added-files-size": "389", "added-records": "1", "changed-partition-count": "1", "spark.app.id": "local-1714372186602", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", "total-files-size": "389", "total-position-deletes": "0", "total-records": "1" } ``` New Snapshot Summary ``` { "added-data-files": "1", "added-files-size": "389", "added-records": "1", "changed-partition-count": "1", "spark.app.id": "local-1714372186602", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", "total-files-size": "389", "total-position-deletes": "0", "total-records": "1", "total-data-manifest-files": "1", "total-delete-manifest-files": "1" } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Prevent duplicate data/delete files [iceberg]
Fokko commented on code in PR #10007: URL: https://github.com/apache/iceberg/pull/10007#discussion_r1582803332 ## core/src/main/java/org/apache/iceberg/FastAppend.java: ## @@ -83,9 +85,13 @@ protected Map summary() { @Override public FastAppend appendFile(DataFile file) { -this.hasNewFiles = true; -newFiles.add(file); -summaryBuilder.addedFile(spec, file); +Preconditions.checkNotNull(file, "Invalid data file: null"); +if (newFilePaths.add(file.path())) { + this.hasNewFiles = true; Review Comment: Nit: This flag could also be replaced by `newFilePaths.length() > 0`. We could also do this in a follow up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] AWS: Creating a Glue table with Lake Formation enabled fails [iceberg]
Albertagamergod1 commented on issue #10226: URL: https://github.com/apache/iceberg/issues/10226#issuecomment-2082323233 Continue your work please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Add property to disable table initialization for JdbcCatalog [iceberg]
nastra merged PR #10124: URL: https://github.com/apache/iceberg/pull/10124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] feat: Convert predicate to arrow filter and push down to parquet reader [iceberg-rust]
liurenjie1024 commented on code in PR #295: URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1582960019 ## crates/iceberg/src/arrow/reader.rs: ## @@ -186,4 +221,637 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + +fn get_row_filter( +&self, +parquet_schema: &SchemaDescriptor, +collector: &CollectFieldIdVisitor, +) -> Result> { +if let Some(predicates) = &self.predicates { +let field_id_map = build_field_id_map(parquet_schema)?; + +let column_indices = collector +.field_ids +.iter() +.map(|field_id| { +field_id_map.get(field_id).cloned().ok_or_else(|| { +Error::new(ErrorKind::DataInvalid, "Field id not found in schema") +}) +}) +.collect::>>()?; + +// Convert BoundPredicates to ArrowPredicates +let mut converter = PredicateConverter { +columns: &column_indices, +projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), +parquet_schema, +column_map: &field_id_map, +}; +let arrow_predicate = visit(&mut converter, predicates)?; +Ok(Some(RowFilter::new(vec![arrow_predicate]))) +} else { +Ok(None) +} +} +} + +/// Build the map of field id to Parquet column index in the schema. +fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { +let mut column_map = HashMap::new(); +for (idx, field) in parquet_schema.columns().iter().enumerate() { +let field_type = field.self_type(); +match field_type { +ParquetType::PrimitiveType { basic_info, .. } => { +if !basic_info.has_id() { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column {:?} in schema doesn't have field id", +field_type +), +)); +} +column_map.insert(basic_info.id(), idx); +} +ParquetType::GroupType { .. } => { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column in schema should be primitive type but got {:?}", +field_type +), +)); +} +}; +} + +Ok(column_map) +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { +field_ids: Vec, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { +type T = (); + +fn always_true(&mut self) -> Result { +Ok(()) +} + +fn always_false(&mut self) -> Result { +Ok(()) +} + +fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { +Ok(()) +} + +fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { +Ok(()) +} + +fn not(&mut self, _inner: Self::T) -> Result { +Ok(()) +} + +fn is_null( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn not_null( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn is_nan( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn not_nan( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn less_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn less_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn greater_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn greater_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &
Re: [PR] MR: iceberg storage handler should set common projection pruning config [iceberg]
pvary commented on code in PR #10188: URL: https://github.com/apache/iceberg/pull/10188#discussion_r1582970748 ## mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -111,8 +111,15 @@ public void configureTableJobProperties(TableDesc tableDesc, Map // @Override public void configureInputJobCredentials(TableDesc tableDesc, Map secrets) {} + private void setCommonJobConf(JobConf jobConf) { Review Comment: I think the changes effect of the configuration used by the query, and do not effect the configuration used by the shell. You could check the values on the executors when the job is running. Or you could check the result of the query. Create a test which fails without the change because of the wrong pruning, and successful after the change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: add a new task-type field to task JSON serialization. add data task JSON serialization imp. [iceberg]
nastra commented on code in PR #9728: URL: https://github.com/apache/iceberg/pull/9728#discussion_r1582996766 ## core/src/main/java/org/apache/iceberg/SnapshotsTable.java: ## @@ -27,7 +28,8 @@ * This does not include snapshots that have been expired using {@link ExpireSnapshots}. */ public class SnapshotsTable extends BaseMetadataTable { - private static final Schema SNAPSHOT_SCHEMA = + @VisibleForTesting + static final Schema SNAPSHOT_SCHEMA = Review Comment: I think having a custom schema for the test makes sense here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: add a new task-type field to task JSON serialization. add data task JSON serialization imp. [iceberg]
nastra commented on code in PR #9728: URL: https://github.com/apache/iceberg/pull/9728#discussion_r1583002079 ## core/src/test/java/org/apache/iceberg/TestDataTaskParser.java: ## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.JsonUtil; +import org.junit.jupiter.api.Test; + +public class TestDataTaskParser { + @Test + public void nullCheck() throws Exception { +StringWriter writer = new StringWriter(); +JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + +assertThatThrownBy(() -> DataTaskParser.toJson(null, generator)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage("Invalid data task: null"); + +assertThatThrownBy(() -> DataTaskParser.toJson((StaticDataTask) createDataTask(), null)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage("Invalid JSON generator: null"); + +assertThatThrownBy(() -> DataTaskParser.fromJson(null)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage("Invalid JSON node for data task: null"); + } + + @Test + public void invalidJsonNode() throws Exception { +String jsonStr = "{\"str\":\"1\", \"arr\":[]}"; +ObjectMapper mapper = new ObjectMapper(); +JsonNode rootNode = mapper.reader().readTree(jsonStr); + +assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str"))) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Invalid JSON node for data task: non-object "); + +assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("arr"))) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Invalid JSON node for data task: non-object "); + } + + @Test + public void missingFields() throws Exception { +ObjectMapper mapper = new ObjectMapper(); + +String missingSchemaStr = "{}"; +JsonNode missingSchemaNode = mapper.reader().readTree(missingSchemaStr); +assertThatThrownBy(() -> DataTaskParser.fromJson(missingSchemaNode)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Cannot parse missing field: schema"); + +String missingProjectionStr = +"{" ++ "\"schema\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}" ++ "}"; +JsonNode missingProjectionNode = mapper.reader().readTree(missingProjectionStr); +assertThatThrownBy(() -> DataTaskParser.fromJson(missingProjectionNode)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Cannot parse missing field: projection"); + +String missingMetadataFileStr = +"{" ++ "\"schema\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}," ++ "\"projection\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}" ++ "}"; +JsonNode missingMetadataFileNode = mapper.reader().readTree(missingMetadataFileStr); +assertThatThrownBy(() -> DataTaskParser.fromJson(missingMetadataFileNode)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Cannot parse missing field: metadata-file"); + +String missingTableRowsStr = +"{\"task-type\":\"data-task\"," ++ "\"schema\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\"
Re: [PR] Core: add a new task-type field to task JSON serialization. add data task JSON serialization imp. [iceberg]
nastra commented on code in PR #9728: URL: https://github.com/apache/iceberg/pull/9728#discussion_r1583003922 ## core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java: ## @@ -84,20 +127,38 @@ private String expectedFileScanTaskJson() { + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}"; } + private String fileScanTaskJson() { +return "{\"task-type\":\"file-scan-task\"," ++ "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[" ++ "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"}," ++ "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]}," ++ "\"spec\":{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\"," ++ "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]}," ++ "\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," ++ "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," ++ "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," ++ "\"start\":0,\"length\":10," ++ "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," ++ "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," ++ "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}," ++ "{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/data-a2-deletes.parquet\"," ++ "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},\"file-size-in-bytes\":10," ++ "\"record-count\":1,\"equality-ids\":[1],\"sort-order-id\":0}]," ++ "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}"; + } + private static void assertFileScanTaskEquals( FileScanTask expected, FileScanTask actual, PartitionSpec spec, boolean caseSensitive) { TestContentFileParser.assertContentFileEquals(expected.file(), actual.file(), spec); -Assertions.assertThat(actual.deletes()).hasSameSizeAs(expected.deletes()); +assertThat(actual.deletes()).hasSameSizeAs(expected.deletes()); for (int pos = 0; pos < expected.deletes().size(); ++pos) { TestContentFileParser.assertContentFileEquals( expected.deletes().get(pos), actual.deletes().get(pos), spec); } -Assertions.assertThat(expected.schema().sameSchema(actual.schema())) -.as("Schema should match") -.isTrue(); -Assertions.assertThat(actual.spec()).isEqualTo(expected.spec()); -Assertions.assertThat( +assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue(); Review Comment: ```suggestion assertThat(actual.schema().asStruct()).isEqualTo(expected.schema().asStruct()); ``` if the assertion fails, then this will show where the schema mismatch is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: add a new task-type field to task JSON serialization. add data task JSON serialization imp. [iceberg]
nastra commented on code in PR #9728: URL: https://github.com/apache/iceberg/pull/9728#discussion_r1583006389 ## core/src/test/java/org/apache/iceberg/TestDataTaskParser.java: ## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.JsonUtil; +import org.junit.jupiter.api.Test; + +public class TestDataTaskParser { + @Test + public void nullCheck() throws Exception { +StringWriter writer = new StringWriter(); +JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + +assertThatThrownBy(() -> DataTaskParser.toJson(null, generator)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage("Invalid data task: null"); + +assertThatThrownBy(() -> DataTaskParser.toJson((StaticDataTask) createDataTask(), null)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage("Invalid JSON generator: null"); + +assertThatThrownBy(() -> DataTaskParser.fromJson(null)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage("Invalid JSON node for data task: null"); + } + + @Test + public void invalidJsonNode() throws Exception { +String jsonStr = "{\"str\":\"1\", \"arr\":[]}"; +ObjectMapper mapper = new ObjectMapper(); +JsonNode rootNode = mapper.reader().readTree(jsonStr); + +assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str"))) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Invalid JSON node for data task: non-object "); + +assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("arr"))) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Invalid JSON node for data task: non-object "); + } + + @Test + public void missingFields() throws Exception { +ObjectMapper mapper = new ObjectMapper(); + +String missingSchemaStr = "{}"; +JsonNode missingSchemaNode = mapper.reader().readTree(missingSchemaStr); +assertThatThrownBy(() -> DataTaskParser.fromJson(missingSchemaNode)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Cannot parse missing field: schema"); + +String missingProjectionStr = +"{" ++ "\"schema\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}" ++ "}"; +JsonNode missingProjectionNode = mapper.reader().readTree(missingProjectionStr); +assertThatThrownBy(() -> DataTaskParser.fromJson(missingProjectionNode)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Cannot parse missing field: projection"); + +String missingMetadataFileStr = +"{" ++ "\"schema\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}," ++ "\"projection\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}" ++ "}"; +JsonNode missingMetadataFileNode = mapper.reader().readTree(missingMetadataFileStr); +assertThatThrownBy(() -> DataTaskParser.fromJson(missingMetadataFileNode)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageContaining("Cannot parse missing field: metadata-file"); + +String missingTableRowsStr = +"{\"task-type\":\"data-task\"," ++ "\"schema\":{\"type\":\"struct\",\"schema-id\":0," ++ "\"fields\":[{\"id\"
Re: [PR] Basic Integration with Datafusion [iceberg-rust]
liurenjie1024 commented on code in PR #324: URL: https://github.com/apache/iceberg-rust/pull/324#discussion_r1583019973 ## crates/integrations/datafusion/src/physical_plan/scan.rs: ## @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, pin::Pin, sync::Arc}; + +use datafusion::{ +arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef}, +execution::{SendableRecordBatchStream, TaskContext}, +physical_expr::EquivalenceProperties, +physical_plan::{ +stream::RecordBatchStreamAdapter, DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, +PlanProperties, +}, +}; +use futures::{Stream, TryStreamExt}; +use iceberg::table::Table; + +use crate::to_datafusion_error; + +#[derive(Debug)] +pub(crate) struct IcebergTableScan { +table: Table, +schema: ArrowSchemaRef, +plan_properties: PlanProperties, +} + +impl IcebergTableScan { +pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { +let plan_properties = Self::compute_properties(schema.clone()); + +Self { +table, +schema, +plan_properties, +} +} + +fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { +// TODO: +// This is more or less a placeholder, to be replaced +// once we support output-partitioning +PlanProperties::new( +EquivalenceProperties::new(schema), +Partitioning::UnknownPartitioning(1), +ExecutionMode::Bounded, +) +} +} + +impl ExecutionPlan for IcebergTableScan { +fn as_any(&self) -> &dyn Any { +self +} + +fn children(&self) -> Vec> { +vec![] +} + +fn with_new_children( +self: Arc, +_children: Vec>, +) -> datafusion::error::Result> { +Ok(self) Review Comment: Cool, that sounds reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] MR: iceberg storage handler should set common projection pruning config [iceberg]
ludlows commented on code in PR #10188: URL: https://github.com/apache/iceberg/pull/10188#discussion_r1583036309 ## mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -111,8 +111,15 @@ public void configureTableJobProperties(TableDesc tableDesc, Map // @Override public void configureInputJobCredentials(TableDesc tableDesc, Map secrets) {} + private void setCommonJobConf(JobConf jobConf) { Review Comment: @pvary it seems that I need to create a table without setting `tez.mrreader.config.update.properties` and execute a query sql getting a wrong result. but where could I find such table ? or my understanding is wrong ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] feat: Convert predicate to arrow filter and push down to parquet reader [iceberg-rust]
liurenjie1024 commented on code in PR #295: URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1583008556 ## crates/iceberg/src/arrow/reader.rs: ## @@ -186,4 +219,634 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + +fn get_row_filter( +&self, +parquet_schema: &SchemaDescriptor, +collector: &CollectFieldIdVisitor, +) -> Result> { +if let Some(predicates) = &self.predicates { +let field_id_map = build_field_id_map(parquet_schema)?; + +// Collect Parquet column indices from field ids. +// If the field id is not found in Parquet schema, it will be ignored due to schema evolution. +let column_indices = collector +.field_ids +.iter() +.map(|field_id| field_id_map.get(field_id).cloned()) +.flatten() +.collect::>(); + +// Convert BoundPredicates to ArrowPredicates +let mut converter = PredicateConverter { +projection_mask: ProjectionMask::leaves(parquet_schema, column_indices), +parquet_schema, +column_map: &field_id_map, +}; +let arrow_predicate = visit(&mut converter, predicates)?; +Ok(Some(RowFilter::new(vec![arrow_predicate]))) +} else { +Ok(None) +} +} +} + +/// Build the map of field id to Parquet column index in the schema. +fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { +let mut column_map = HashMap::new(); +for (idx, field) in parquet_schema.columns().iter().enumerate() { +let field_type = field.self_type(); +match field_type { +ParquetType::PrimitiveType { basic_info, .. } => { +if !basic_info.has_id() { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column idx: {}, name: {}, type {:?} in schema doesn't have field id", +idx, +basic_info.name(), +field_type +), +)); +} +column_map.insert(basic_info.id(), idx); +} +ParquetType::GroupType { .. } => { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column in schema should be primitive type but got {:?}", +field_type +), +)); +} +}; +} + +Ok(column_map) +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { +field_ids: HashSet, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { +type T = (); + +fn always_true(&mut self) -> Result<()> { +Ok(()) +} + +fn always_false(&mut self) -> Result<()> { +Ok(()) +} + +fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> { +Ok(()) +} + +fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> { +Ok(()) +} + +fn not(&mut self, _inner: ()) -> Result<()> { +Ok(()) +} + +fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { +self.field_ids.insert(reference.field().id); +Ok(()) +} + +fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { +self.field_ids.insert(reference.field().id); +Ok(()) +} + +fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { +self.field_ids.insert(reference.field().id); +Ok(()) +} + +fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { +self.field_ids.insert(reference.field().id); +Ok(()) +} + +fn less_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result<()> { +self.field_ids.insert(reference.field().id); +Ok(()) +} + +fn less_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result<()> { +self.field_ids.insert(reference.field().id); +Ok(()) +} + +fn greater_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result<()> { +self.field_ids.insert(reference.field().id); +Ok(()) +} + +fn greater_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +
[I] Spark CDC does not respect when the table is rolled back. [iceberg]
javrasya opened a new issue, #10247: URL: https://github.com/apache/iceberg/issues/10247 ### Apache Iceberg version 1.4.3 ### Query engine Spark ### Please describe the bug 🐞 We had to rollback our table because it had some broken snapshots. We are turning that table which gets upserts into a changelog stream in the downstream and process it that way. We use time boundaries. The way how it seems to work is that it looks at the history of the table and do some sort of a time travel query to find the recent snapshot id as of the end timestamp we pass down the the CDC procedure. But since it only uses the history entries which does not give enough info about if the snapshots there are in the link for the current main branch reference. Here is the problematic line which calls the function in the iceberg-core https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java#L530 https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L350-L358 I think it should disregard the snapshots when they are no longer in the main branch link -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke opened a new pull request, #360: URL: https://github.com/apache/iceberg-rust/pull/360 ### Which issue does this PR close? Closes #359 ### Rationale for this change The `partition_filter` (inclusive projection) is not only required by the `ManifestEvaluator` but also by the (incoming) `ExpressionEvaluator`. In order to avoid duplicate code as well as unnecessary computation of the partition filters, we should extract the computation of the partition filters and cache the results per partition_spec_id. With this refactor we (hopefully) should be able to integrate the `ExpressionEvaluator` and the `InclusiveMetricsEvaluator` more easily. ### What changes are included in this PR? - refactor: Extract and decouple computation of partition filters from construction logic of ManifestEvaluator - refactor: testsuite ManifestEvaluator - refactor: add FileScanStreamContext + helper fn to construct partition_spec & schema - refactor: add thin wrapper for PartitionFileCache & ManifestEvaluatorCache for better encapsulation ### Are these changes tested? Yes. Unit tests are included. If PR is okay; I will add some basis tests for the new structs (FileScanStreamContext, etc.) as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#issuecomment-2082804625 @sdd @Fokko @liurenjie1024 PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Files metadata table [iceberg-python]
Gowthami03B commented on code in PR #614: URL: https://github.com/apache/iceberg-python/pull/614#discussion_r1583225201 ## tests/integration/test_inspect_table.py: ## @@ -445,3 +445,65 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") check_pyiceberg_df_equals_spark_df(df, spark_df) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_files( +spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: +identifier = "default.table_metadata_files" +tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + +tbl.overwrite(arrow_table_with_null) + +# append more data +tbl.append(arrow_table_with_null) + +df = tbl.refresh().inspect.files() + +assert df.column_names == [ +'content', +'file_path', +'file_format', +'record_count', +'file_size_in_bytes', +'column_sizes', +'value_counts', +'null_value_counts', +'nan_value_counts', +'lower_bounds', +'upper_bounds', +'key_metadata', +'split_offsets', +'equality_ids', +] + +for file_size_in_bytes in df['file_size_in_bytes']: +assert isinstance(file_size_in_bytes.as_py(), int) + +for split_offsets in df['split_offsets']: +assert isinstance(split_offsets.as_py(), list) + +for file_format in df['file_format']: +assert file_format.as_py() == "PARQUET" + +for file_path in df['file_path']: +assert file_path.as_py().startswith("s3://") + +lhs = spark.table(f"{identifier}.files").toPandas() Review Comment: Done, made the change! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Files metadata table [iceberg-python]
Gowthami03B commented on code in PR #614: URL: https://github.com/apache/iceberg-python/pull/614#discussion_r1583228304 ## tests/conftest.py: ## @@ -2060,7 +2060,7 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000";) .config("spark.sql.catalog.hive.s3.path-style-access", "true") -.config("spark.sql.execution.arrow.pyspark.enabled", "true") +.config("spark.sql.execution.arrow.pyspark.enabled", "false") Review Comment: Had an offline discussion with @kevinjqliu , tried using docker system prune --all and then using .config("spark.sql.execution.arrow.pyspark.enabled", "true") I still have the "memory leak" issue in my local -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Files metadata table [iceberg-python]
Gowthami03B commented on code in PR #614: URL: https://github.com/apache/iceberg-python/pull/614#discussion_r1573767911 ## pyiceberg/table/__init__.py: ## @@ -3537,6 +3537,58 @@ def update_partitions_map( schema=table_schema, ) +def files(self) -> "pa.Table": +import pyarrow as pa + +files_schema = pa.schema([ +pa.field('content', pa.int8(), nullable=False), +pa.field('file_path', pa.string(), nullable=False), +pa.field('file_format', pa.string(), nullable=False), Review Comment: @Fokko I think we could also make ```last_updated_snapshot_id``` also categorical? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Bug Fix `PyArrowFileIO.parse_location` hdfs uri [iceberg-python]
syun64 opened a new pull request, #668: URL: https://github.com/apache/iceberg-python/pull/668 PyArrow HadoopFileSystem is a thin wrapper around [libhdfs](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/LibHdfs.html). For a given hdfs uri string that looks like `hdfs://host:port/path`, we should return the `/path` instead of the entire uri. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Apply DeleteGranularity for writes [iceberg]
RussellSpitzer commented on code in PR #10200: URL: https://github.com/apache/iceberg/pull/10200#discussion_r1583299839 ## core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java: ## @@ -109,18 +112,34 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private final StructProjection structProjection; private RollingFileWriter dataWriter; private RollingEqDeleteWriter eqDeleteWriter; -private SortedPosDeleteWriter posDeleteWriter; +private FileWriter, DeleteWriteResult> posDeleteWriter; private Map insertedRowMap; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { + this(partition, schema, deleteSchema, DeleteGranularity.PARTITION); +} + +protected BaseEqualityDeltaWriter( +StructLike partition, +Schema schema, +Schema deleteSchema, +DeleteGranularity deleteGranularity) { Review Comment: We discussed a bit on slack. I think no-conf is good, in my mind if there was a benefit from having the two modes as an option it's something flink should be choosing at runtime rather than a user defined config. I think this is also mostly a perf difference so I wouldn't mind changing the behavior from the previous behavior in those edge cases with enough data to write multiple files in the same partition in one batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] `PyArrowFileIO.parse_location` return error `path` for hdfs location. [iceberg-python]
syun64 commented on issue #449: URL: https://github.com/apache/iceberg-python/issues/449#issuecomment-2083068661 Hi @luocan17 thank you for raising this issue. I have a PR up to attempt a fix. Could I ask for your review on: https://github.com/apache/iceberg-python/pull/668 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Apply DeleteGranularity for writes [iceberg]
stevenzwu commented on code in PR #10200: URL: https://github.com/apache/iceberg/pull/10200#discussion_r1583309420 ## core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java: ## @@ -109,18 +112,34 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private final StructProjection structProjection; private RollingFileWriter dataWriter; private RollingEqDeleteWriter eqDeleteWriter; -private SortedPosDeleteWriter posDeleteWriter; +private FileWriter, DeleteWriteResult> posDeleteWriter; private Map insertedRowMap; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { + this(partition, schema, deleteSchema, DeleteGranularity.PARTITION); +} + +protected BaseEqualityDeltaWriter( +StructLike partition, +Schema schema, +Schema deleteSchema, +DeleteGranularity deleteGranularity) { Review Comment: I am also in favor of no config. It is a "good" internal impl/perf change. In the data file rollover case, one smaller delete file every data file is better than one larger delete file for multiple data files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Migrate FlinkTestBase related tests [iceberg]
tomtongue commented on PR #10232: URL: https://github.com/apache/iceberg/pull/10232#issuecomment-2083084976 Thanks for the review. Sure, will add the changes for `TestFlinkSourceConfig` and removing the testbase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Spark: Dropping partition column from old partition table corrupts entire table [iceberg]
EXPEbdodla commented on issue #10234: URL: https://github.com/apache/iceberg/issues/10234#issuecomment-2083137068 > Which Spark version are you using? I was originally trying with Spark 3.3.0 and iceberg 1.2.1 version. Later I tried with Spark-iceberg docker images `tabulario/spark-iceberg:latest`, `tabulario/spark-iceberg:3.5.1_1.4.3` and `tabulario/spark-iceberg:3.3.2_1.2.1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Iceberg/Comet integration POC [iceberg]
huaxingao commented on PR #9841: URL: https://github.com/apache/iceberg/pull/9841#issuecomment-2083223651 @aokolnychyi I have addressed the comments. Could you please take one more look when you have a moment? Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] [WIP]: Add `InclusiveMetricsEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #347: URL: https://github.com/apache/iceberg-rust/pull/347#discussion_r1583517231 ## crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs: ## @@ -0,0 +1,744 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{DataFile, Datum, Literal, PrimitiveLiteral}; +use crate::{Error, ErrorKind}; +use fnv::FnvHashSet; + +const IN_PREDICATE_LIMIT: usize = 200; +const ROWS_MIGHT_MATCH: crate::Result = Ok(true); +const ROWS_CANNOT_MATCH: crate::Result = Ok(false); + +pub(crate) struct InclusiveMetricsEvaluator<'a> { +data_file: &'a DataFile, +} + +impl<'a> InclusiveMetricsEvaluator<'a> { +fn new(data_file: &'a DataFile) -> Self { +InclusiveMetricsEvaluator { data_file } +} + +/// Evaluate this `InclusiveMetricsEvaluator`'s filter predicate against the +/// provided [`DataFile`]'s metrics. Used by [`TableScan`] to +/// see if this `DataFile` contains data that could match +/// the scan's filter. +pub(crate) fn eval( +filter: &'a BoundPredicate, Review Comment: just an idea: Can we accept `&'a Option` here? This way we can move the check if we have a row_filter at all from scan.rs into the `InclusiveMetricsEvaluator` itself and simply return `ROWS_MIGHT_MATCH` if BoundPredicate is None? ## crates/iceberg/src/scan.rs: ## @@ -218,6 +230,18 @@ impl TableScan { let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); while let Some(manifest_entry) = manifest_entries.next().await { + +if let Some(ref bound_predicate) = bound_predicate { Review Comment: move the check into `InclusiveMetricsEvaluator` (see other comment), which returns ROWS_MIGHT_MATCH if bound_predicate is None. This way we could entangle the already involved plan_files method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] [WIP]: Add `InclusiveMetricsEvaluator` [iceberg-rust]
sdd commented on code in PR #347: URL: https://github.com/apache/iceberg-rust/pull/347#discussion_r1583560775 ## crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs: ## @@ -0,0 +1,744 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{DataFile, Datum, Literal, PrimitiveLiteral}; +use crate::{Error, ErrorKind}; +use fnv::FnvHashSet; + +const IN_PREDICATE_LIMIT: usize = 200; +const ROWS_MIGHT_MATCH: crate::Result = Ok(true); +const ROWS_CANNOT_MATCH: crate::Result = Ok(false); + +pub(crate) struct InclusiveMetricsEvaluator<'a> { +data_file: &'a DataFile, +} + +impl<'a> InclusiveMetricsEvaluator<'a> { +fn new(data_file: &'a DataFile) -> Self { +InclusiveMetricsEvaluator { data_file } +} + +/// Evaluate this `InclusiveMetricsEvaluator`'s filter predicate against the +/// provided [`DataFile`]'s metrics. Used by [`TableScan`] to +/// see if this `DataFile` contains data that could match +/// the scan's filter. +pub(crate) fn eval( +filter: &'a BoundPredicate, Review Comment: I'm not sure I like that. It results in less code but it doesn't feel right, semantically. I'll have a think to see if there's a more concise way to do this within `TableScan`. Perhaps a shorter code path if there is no filter, with a longer path if there is one, but that might involve a bit of duplication. Alternatively we could set the filter predicate to `AlwaysTrue` if none is supplied to the scan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
sdd commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583574672 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { Review Comment: Could we build the `Option` inside `new`, and have new return a `Result`, and then have `bound_filter` return an `Option<&BoundPredicate>` instead so that we're effectively caching the bind operation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
sdd commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583579994 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { Review Comment: This is much cleaner 😎 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
sdd commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583577520 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { +match self.filter { +Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), +None => Ok(None), +} +} + +/// Creates a reference-counted [`PartitionSpec`] and a +/// corresponding schema based on the specified partition spec id. +fn create_partition_spec_and_schema( +&self, +spec_id: i32, +) -> Result<(&PartitionSpecRef, SchemaRef)> { Review Comment: Seems a bit unexpected to return an `&PartitionSpecRef`. Usually you'd return either a `PartitionSpecRef` or a `&PartitionSpec`. Could we also do this up-front in `new` or just once, storing the `PartitionSpecRef` and `SchemaRef` in the context so that this method just clones those `Arc`s from the context? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Spark: CDC does not respect when the table is rolled back. [iceberg]
javrasya commented on issue #10247: URL: https://github.com/apache/iceberg/issues/10247#issuecomment-2083426721 Exactly @manuzhang . It feels like it should filter that out and this is a bug. Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583607459 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { Review Comment: Also sounds good to me. I'll take a look tomorrow. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583606448 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { +match self.filter { +Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), +None => Ok(None), +} +} + +/// Creates a reference-counted [`PartitionSpec`] and a +/// corresponding schema based on the specified partition spec id. +fn create_partition_spec_and_schema( +&self, +spec_id: i32, +) -> Result<(&PartitionSpecRef, SchemaRef)> { Review Comment: Sounds reasonable to me. I'll take a look tomorrow. Thanks for the suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
Fokko commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583486316 ## crates/iceberg/src/expr/visitors/manifest_evaluator.rs: ## @@ -16,74 +16,49 @@ // under the License. use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; -use crate::expr::visitors::inclusive_projection::InclusiveProjection; -use crate::expr::{Bind, BoundPredicate, BoundReference}; -use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef}; -use crate::{Error, ErrorKind}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{Datum, FieldSummary, ManifestFile}; +use crate::{Error, ErrorKind, Result}; use fnv::FnvHashSet; -use std::sync::Arc; -/// Evaluates [`ManifestFile`]s to see if their partition summary matches a provided -/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of [`ManifestFile`]s +#[derive(Debug)] +/// Evaluates a [`ManifestFile`] to see if the partition summaries +/// match a provided [`BoundPredicate`]. +/// +/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s /// in which data might be found that matches the TableScan's filter. pub(crate) struct ManifestEvaluator { -partition_schema: SchemaRef, +partition_schema_id: i32, partition_filter: BoundPredicate, case_sensitive: bool, } impl ManifestEvaluator { pub(crate) fn new( -partition_spec: PartitionSpecRef, -table_schema: SchemaRef, -filter: BoundPredicate, +partition_schema_id: i32, Review Comment: To align the naming: ```suggestion partition_spec_id: i32, ``` ## crates/iceberg/src/expr/visitors/manifest_evaluator.rs: ## @@ -16,74 +16,49 @@ // under the License. use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; -use crate::expr::visitors::inclusive_projection::InclusiveProjection; -use crate::expr::{Bind, BoundPredicate, BoundReference}; -use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef}; -use crate::{Error, ErrorKind}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{Datum, FieldSummary, ManifestFile}; +use crate::{Error, ErrorKind, Result}; use fnv::FnvHashSet; -use std::sync::Arc; -/// Evaluates [`ManifestFile`]s to see if their partition summary matches a provided -/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of [`ManifestFile`]s +#[derive(Debug)] +/// Evaluates a [`ManifestFile`] to see if the partition summaries +/// match a provided [`BoundPredicate`]. +/// +/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s /// in which data might be found that matches the TableScan's filter. pub(crate) struct ManifestEvaluator { -partition_schema: SchemaRef, +partition_schema_id: i32, partition_filter: BoundPredicate, case_sensitive: bool, } impl ManifestEvaluator { pub(crate) fn new( -partition_spec: PartitionSpecRef, -table_schema: SchemaRef, -filter: BoundPredicate, +partition_schema_id: i32, Review Comment: If it only for checking the spec-id, I would leave it out and make sure that we have proper tests to avoid having to do these runtime checks :) ## crates/iceberg/src/scan.rs: ## @@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> { } /// Build the table scan. -pub fn build(self) -> crate::Result { +pub fn build(self) -> Result { Review Comment: Nit: should we have a convention to leave in, or remove the `crate::` prefix? Preferably also a checker ## crates/iceberg/src/scan.rs: ## @@ -169,55 +177,66 @@ pub struct TableScan { filter: Option>, } -/// A stream of [`FileScanTask`]. -pub type FileScanTaskStream = BoxStream<'static, crate::Result>; - impl TableScan { -/// Returns a stream of file scan tasks. - -pub async fn plan_files(&self) -> crate::Result { -// Cache `ManifestEvaluatorFactory`s created as part of this scan -let mut manifest_evaluator_cache: HashMap = HashMap::new(); - -// these variables needed to ensure that we don't need to pass a -// reference to self into `try_stream`, as it expects references -// passed in to outlive 'static -let schema = self.schema.clone(); -let snapshot = self.snapshot.clone(); -let table_metadata = self.table_metadata.clone(); -let file_io = self.file_io.clone(); -let case_sensitive = self.case_sensitive; -let filter = self.filter.clone(); +/// Returns a stream of [`FileScanTask`]s. +pub async fn plan_files(&self) -> Result { +let context = FileScanStreamContext::new( +self.schema.clone(), +self.snapshot.clone(), +self.table_metadata.clone(), +self.file_io.clone(), +self.filter.clone(), +sel
[I] Documentation page returning 404 [iceberg]
yakovsushenok opened a new issue, #10249: URL: https://github.com/apache/iceberg/issues/10249 ### Apache Iceberg version None ### Query engine None ### Please describe the bug 🐞 [This](https://iceberg.apache.org/docs/1.5.0/spark-configuration.md#sql-extensions) link (the second one) on [this](https://iceberg.apache.org/docs/1.5.0/spark-procedures/) page is returning a 404. Not sure if this is the right place to report it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] feat: Convert predicate to arrow filter and push down to parquet reader [iceberg-rust]
viirya commented on code in PR #295: URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1583629653 ## crates/iceberg/src/arrow/reader.rs: ## @@ -186,4 +221,637 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + +fn get_row_filter( +&self, +parquet_schema: &SchemaDescriptor, +collector: &CollectFieldIdVisitor, +) -> Result> { +if let Some(predicates) = &self.predicates { +let field_id_map = build_field_id_map(parquet_schema)?; + +let column_indices = collector +.field_ids +.iter() +.map(|field_id| { +field_id_map.get(field_id).cloned().ok_or_else(|| { +Error::new(ErrorKind::DataInvalid, "Field id not found in schema") +}) +}) +.collect::>>()?; + +// Convert BoundPredicates to ArrowPredicates +let mut converter = PredicateConverter { +columns: &column_indices, +projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), +parquet_schema, +column_map: &field_id_map, +}; +let arrow_predicate = visit(&mut converter, predicates)?; +Ok(Some(RowFilter::new(vec![arrow_predicate]))) +} else { +Ok(None) +} +} +} + +/// Build the map of field id to Parquet column index in the schema. +fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { +let mut column_map = HashMap::new(); +for (idx, field) in parquet_schema.columns().iter().enumerate() { +let field_type = field.self_type(); +match field_type { +ParquetType::PrimitiveType { basic_info, .. } => { +if !basic_info.has_id() { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column {:?} in schema doesn't have field id", +field_type +), +)); +} +column_map.insert(basic_info.id(), idx); +} +ParquetType::GroupType { .. } => { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column in schema should be primitive type but got {:?}", +field_type +), +)); +} +}; +} + +Ok(column_map) +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { +field_ids: Vec, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { +type T = (); + +fn always_true(&mut self) -> Result { +Ok(()) +} + +fn always_false(&mut self) -> Result { +Ok(()) +} + +fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { +Ok(()) +} + +fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { +Ok(()) +} + +fn not(&mut self, _inner: Self::T) -> Result { +Ok(()) +} + +fn is_null( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn not_null( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn is_nan( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn not_nan( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn less_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn less_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn greater_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn greater_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPr
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#issuecomment-2083513030 @Fokko thanks for clearing this up and thanks for the review. This was is the [line](https://github.com/apache/iceberg-python/blob/main/pyiceberg%2Ftable%2F__init__.py#L1647) that led me to believe we also use the inclusive projection in the expression evaluator. However, if I understand correctly we still needed to extract the partition filters since they will be needed for positional deletes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Apply DeleteGranularity for writes [iceberg]
aokolnychyi commented on code in PR #10200: URL: https://github.com/apache/iceberg/pull/10200#discussion_r1583669332 ## core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java: ## @@ -109,18 +112,34 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private final StructProjection structProjection; private RollingFileWriter dataWriter; private RollingEqDeleteWriter eqDeleteWriter; -private SortedPosDeleteWriter posDeleteWriter; +private FileWriter, DeleteWriteResult> posDeleteWriter; private Map insertedRowMap; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { + this(partition, schema, deleteSchema, DeleteGranularity.PARTITION); +} + +protected BaseEqualityDeltaWriter( +StructLike partition, +Schema schema, +Schema deleteSchema, +DeleteGranularity deleteGranularity) { Review Comment: Seems like everyone is in favor of not exposing the config then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Apply DeleteGranularity for writes [iceberg]
aokolnychyi commented on code in PR #10200: URL: https://github.com/apache/iceberg/pull/10200#discussion_r1583671212 ## docs/docs/flink-configuration.md: ## @@ -124,8 +124,9 @@ env.getConfig() | max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE| Max number of snapshots limited per split enumeration. Applicable only to streaming read. | | limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. | | max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3| Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. | -| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the `splitAssignerFactory` will be overridden with `OrderedSplitAssignerFactory`. | -| watermark-column-time-unit| connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS| Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS.
Re: [PR] Metadata Log Entries metadata table [iceberg-python]
corleyma commented on code in PR #667: URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1583779367 ## pyiceberg/table/metadata.py: ## @@ -292,6 +292,13 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None +def _snapshot_as_of_timestamp_ms(self, timestamp_ms: int) -> Optional[Snapshot]: Review Comment: Any reason not to make this public? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Metadata Log Entries metadata table [iceberg-python]
corleyma commented on code in PR #667: URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1583779367 ## pyiceberg/table/metadata.py: ## @@ -292,6 +292,13 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None +def _snapshot_as_of_timestamp_ms(self, timestamp_ms: int) -> Optional[Snapshot]: Review Comment: Any reason not to make this public? PyIceberg ought to have an interface for this, though I suppose it's understandable if we don't want this to be it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Metadata Log Entries metadata table [iceberg-python]
corleyma commented on code in PR #667: URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1583798006 ## pyiceberg/table/__init__.py: ## @@ -3537,6 +3537,39 @@ def update_partitions_map( schema=table_schema, ) +def metadata_log_entries(self) -> "pa.Table": +import pyarrow as pa + +from pyiceberg.table.snapshots import MetadataLogEntry + +table_schema = pa.schema([ +("timestamp", pa.timestamp(unit='ms'), True), +("file", pa.string(), True), Review Comment: why are file and timestamp nullable? In what scenarios is it expected to have a log entry with a snapshot id but no timestamp or file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Nessie: Make handleExceptionsForCommits public in NessieUtil [iceberg]
dimas-b commented on PR #10248: URL: https://github.com/apache/iceberg/pull/10248#issuecomment-2083757084 @nastra : WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] Getting storage partitioned join to work [iceberg]
mrbrahman opened a new issue, #10250: URL: https://github.com/apache/iceberg/issues/10250 ### Query engine Spark on AWS EMR 6.15 ### Question Trying to get Storage Partitioned join to work in a simple test case, but not successful. I followed most of the settings mentioned in #7832, but still not able to get it to work. Here's what I did: ~~~scala val df = spark.range(0,100) val a = df.repartition(10).withColumn("part", spark_partition_id) a.write.partitionBy("part").format("iceberg").saveAsTable("ice1") a.write.partitionBy("part").format("iceberg").saveAsTable("ice2") ~~~ ~~~sql %sql set spark.sql.autoBroadcastJoinThreshold = -1; set spark.sql.adaptive.enabled = false; set spark.sql.sources.bucketing.enabled = true; set spark.sql.sources.v2.bucketing.enabled=true; -- set "spark.sql.iceberg.planning.preserve-data-grouping=true"; -- this didn't work for me set spark.sql.sources.v2.bucketing.pushPartValues.enabled=true; set spark.sql.requireAllClusterKeysForCoPartition=false; set spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true; create table ice_joined_9 using iceberg select a.id id1, b.id id2 from ice1 a inner join ice2 b on a.id=b.id and a.part=b.part ~~~ The DAG is still showing exchange + sort  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Files metadata table [iceberg-python]
geruh commented on code in PR #614: URL: https://github.com/apache/iceberg-python/pull/614#discussion_r1583803213 ## tests/integration/test_inspect_table.py: ## @@ -445,3 +445,107 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") check_pyiceberg_df_equals_spark_df(df, spark_df) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_files( +spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: +identifier = "default.table_metadata_files" + +tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + +tbl.overwrite(arrow_table_with_null) + +# append more data +tbl.append(arrow_table_with_null) + +df = tbl.refresh().inspect.files() + +assert df.column_names == [ +'content', +'file_path', +'file_format', +'spec_id', +'record_count', +'file_size_in_bytes', +'column_sizes', +'value_counts', +'null_value_counts', +'nan_value_counts', +'lower_bounds', +'upper_bounds', +'key_metadata', +'split_offsets', +'equality_ids', +'sort_order_id', +'readable_metrics', +] + +# make sure the non-nullable fields are filled +for int_column in ['content', 'spec_id', 'record_count', 'file_size_in_bytes']: +for value in df[int_column]: +assert isinstance(value.as_py(), int) + +for split_offsets in df['split_offsets']: +assert isinstance(split_offsets.as_py(), list) + +for file_format in df['file_format']: +assert file_format.as_py() == "PARQUET" + +for file_path in df['file_path']: +assert file_path.as_py().startswith("s3://") + +lhs = df.to_pandas() +rhs = spark.table(f"{identifier}.files").toPandas() +for column in df.column_names: +for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): +if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): +# NaN != NaN in Python +continue +if column in [ +'column_sizes', +'value_counts', +'null_value_counts', +'nan_value_counts', +'lower_bounds', +'upper_bounds', +]: +# Arrow returns a list of tuples, instead of a dict +left = dict(left) Review Comment: left values aren't being used to assert here, also these are nested tuples so you might want to iterate through the map and convert to dict ## tests/integration/test_inspect_table.py: ## @@ -445,3 +445,107 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") check_pyiceberg_df_equals_spark_df(df, spark_df) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_files( +spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: +identifier = "default.table_metadata_files" + +tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + +tbl.overwrite(arrow_table_with_null) + +# append more data +tbl.append(arrow_table_with_null) + +df = tbl.refresh().inspect.files() + +assert df.column_names == [ +'content', +'file_path', +'file_format', +'spec_id', +'record_count', +'file_size_in_bytes', +'column_sizes', +'value_counts', +'null_value_counts', +'nan_value_counts', +'lower_bounds', +'upper_bounds', +'key_metadata', +'split_offsets', +'equality_ids', +'sort_order_id', +'readable_metrics', +] + +# make sure the non-nullable fields are filled +for int_column in ['content', 'spec_id', 'record_count', 'file_size_in_bytes']: +for value in df[int_column]: +assert isinstance(value.as_py(), int) + +for split_offsets in df['split_offsets']: +assert isinstance(split_offsets.as_py(), list) + +for file_format in df['file_format']: +assert file_format.as_py() == "PARQUET" + +for file_path in df['file_path']: +assert file_path.as_py().startswith("s3://") + +lhs = df.to_pandas() +rhs = spark.table(f"{identifier}.files").toPandas() +for column in df.column_name
[I] appending to a table with Decimal > 32767 results in `int too big to convert` [iceberg-python]
vtk9 opened a new issue, #669: URL: https://github.com/apache/iceberg-python/issues/669 ### Apache Iceberg version 0.6.0 (latest release) ### Please describe the bug 🐞 Hello, Is this a bug or is there something obvious I am misunderstanding/misusing. (I am relatively new to iceberg). Tested on MacOS M2 arm64 ```python from decimal import Decimal from pyiceberg.catalog.sql import SqlCatalog import pyarrow as pa pylist = [{'decimal_col': Decimal('32768.')}] arrow_schema = pa.schema( [ pa.field('decimal_col', pa.decimal128(38, 0)), ], ) arrow_table = pa.Table.from_pylist(pylist, schema=arrow_schema) catalog = SqlCatalog( 'test_catalog', **{ 'type': "sql'", 'uri': 'sqlite:///pyiceberg.db', }, ) namespace = 'test_ns' table_name = 'test_table' catalog.create_namespace(namespace=namespace) new_table = catalog.create_table( identifier=f'{namespace}.{table_name}', schema=arrow_schema, location='.', ) new_table.append(arrow_table) ``` ``` OverflowError: int too big to convert ``` - Note: `pylist = [{'decimal_col': Decimal('32767.')}]` works - Switching `pa.field('decimal_col', pa.decimal128(38, 1)),` also works Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Build: Bump mkdocs from 1.5.3 to 1.6.0 [iceberg-python]
dependabot[bot] opened a new pull request, #670: URL: https://github.com/apache/iceberg-python/pull/670 Bumps [mkdocs](https://github.com/mkdocs/mkdocs) from 1.5.3 to 1.6.0. Release notes Sourced from https://github.com/mkdocs/mkdocs/releases";>mkdocs's releases. 1.6.0 Local preview mkdocs serve no longer locks up the browser when more than 5 tabs are open. This is achieved by closing the polling connection whenever a tab becomes inactive. Background tabs will no longer auto-reload either - that will instead happen as soon the tab is opened again. Context: https://redirect.github.com/mkdocs/mkdocs/issues/3391";>#3391 New flag serve --open to open the site in a browser. After the first build is finished, this flag will cause the default OS Web browser to be opened at the home page of the local site. Context: https://redirect.github.com/mkdocs/mkdocs/issues/3500";>#3500 Drafts [!warning] Changed from version 1.5: The exclude_docs config was split up into two separate concepts. The exclude_docs config no longer has any special behavior for mkdocs serve - it now always completely excludes the listed documents from the site. If you wish to use the "drafts" functionality like the exclude_docs key used to do in MkDocs 1.5, please switch to the new config key draft_docs. See https://www.mkdocs.org/user-guide/configuration/#exclude_docs";>documentation. Other changes: Reduce warning levels when a "draft" page has a link to a non-existent file. Context: https://redirect.github.com/mkdocs/mkdocs/issues/3449";>#3449 Update to deduction of page titles MkDocs 1.5 had a change in behavior in deducing the page titles from the first heading. Unfortunately this could cause unescaped HTML tags or entities to appear in edge cases. Now tags are always fully sanitized from the title. Though it still remains the case that https://www.mkdocs.org/dev-guide/api/#mkdocs.structure.files.pages.Page.title";>Page.title is expected to contain HTML entities and is passed directly to the themes. Images (notably, emojis in some extensions) get preserved in the title only through their alt attribute's value. Context: https://redirect.github.com/mkdocs/mkdocs/issues/3564";>#3564, https://redirect.github.com/mkdocs/mkdocs/issues/3578";>#3578 Themes Built-in themes now also support Polish language (https://redirect.github.com/mkdocs/mkdocs/issues/3613";>#3613) "readthedocs" theme Fix: "readthedocs" theme can now correctly handle deeply nested nav configurations (over 2 levels deep), without confusedly expanding all sections and jumping around vertically. (https://redirect.github.com/mkdocs/mkdocs/issues/3464";>#3464) Fix: "readthedocs" theme now shows a link to the repository (with a generic logo) even when isn't one of the 3 known hosters. (https://redirect.github.com/mkdocs/mkdocs/issues/3435";>#3435) "readthedocs" theme now also has translation for the word "theme" in the footer that mistakenly always remained in English. (https://redirect.github.com/mkdocs/mkdocs/issues/3613";>#3613, https://redirect.github.com/mkdocs/mkdocs/issues/3625";>#3625) "mkdocs" theme ... (truncated) Commits https://github.com/mkdocs/mkdocs/commit/0998fec7eb32d269f02c6d87071d6163c251db30";>0998fec Release 1.6.0 (https://redirect.github.com/mkdocs/mkdocs/issues/3631";>#3631) https://github.com/mkdocs/mkdocs/commit/bce85bf2a6ec4e2de83974ae442e7bdb4c775f7f";>bce85bf Fix for showing repository icon even when a file has no edit_uri (https://redirect.github.com/mkdocs/mkdocs/issues/3657";>#3657) https://github.com/mkdocs/mkdocs/commit/0ac05daaa169f9646eb243e220103c739aad231c";>0ac05da Re-generate localization files (https://redirect.github.com/mkdocs/mkdocs/issues/3634";>#3634) https://github.com/mkdocs/mkdocs/commit/6244500bbc5d87d2662833b6022ec411662bb450";>6244500 Remove jQuery from mkdocs theme (https://redirect.github.com/mkdocs/mkdocs/issues/3649";>#3649) https://github.com/mkdocs/mkdocs/commit/f85d429af6b6dc362ee4f121930eae5a80e1c564";>f85d429 Prevent a flash of white color when dark mode is enabled (https://redirect.github.com/mkdocs/mkdocs/issues/3647";>#3647) https://github.com/mkdocs/mkdocs/commit/e39cce220aa5f15ff2abf568319547451186792c";>e39cce2 Fix style of modal close button (https://redirect.github.com/mkdocs/mkdocs/issues/3651";>#3651) https://github.com/mkdocs/mkdocs/commit/652813da4bf405e25c4859315f6015acd287bf54";>652813d Prevent a crash if stdin is not defined (https://redirect.github.com/mkdocs/mkdocs/issues/3609";>#3609) https://github.com/mkdocs/mkdocs/commit/59a295f5f9cbd07dcb52c70d53d16523f24e2919";>59a295f Merge pull request https://redirect.github.com/mkdocs/mkdocs/issues/3493";>#3493 from waylan/2248 https://github.com/mkdocs/mkdocs/commit/6f5e7484ebdc2f5eb5a3c4cb60491306b3acbd25";>6f5
[PR] Build: Bump mkdocs-autorefs from 0.5.0 to 1.0.1 [iceberg-python]
dependabot[bot] opened a new pull request, #671: URL: https://github.com/apache/iceberg-python/pull/671 Bumps [mkdocs-autorefs](https://github.com/mkdocstrings/autorefs) from 0.5.0 to 1.0.1. Release notes Sourced from https://github.com/mkdocstrings/autorefs/releases";>mkdocs-autorefs's releases. 1.0.1 https://github.com/mkdocstrings/autorefs/releases/tag/1.0.1";>1.0.1 - 2024-02-29 https://github.com/mkdocstrings/autorefs/compare/1.0.0...1.0.1";>Compare with 1.0.0 Bug Fixes Don't import MkDocsConfig (does not exist on MkDocs 1.3-) (https://github.com/mkdocstrings/autorefs/commit/9c156643ead1dc24f08b8047bd5b2fcd97662783";>9c15664 by Timothée Mazzucotelli). 1.0.0 https://github.com/mkdocstrings/autorefs/releases/tag/1.0.0";>1.0.0 - 2024-02-27 https://github.com/mkdocstrings/autorefs/compare/0.5.0...1.0.0";>Compare with 0.5.0 Features Add Markdown anchors and aliases (https://github.com/mkdocstrings/autorefs/commit/a215a97a057b54e11ebec8865c64e93429edde63";>a215a97 by Timothée Mazzucotelli). [Replaces-PR-https://redirect.github.com/mkdocstrings/autorefs/issues/20";>#20](https://redirect.github.com/mkdocstrings/autorefs/pull/20";>mkdocstrings/autorefs#20), [Related-to-PR-https://redirect.github.com/mkdocstrings/autorefs/issues/25";>#25](https://redirect.github.com/mkdocstrings/autorefs/pull/25";>mkdocstrings/autorefs#25), [Related-to-issue-https://redirect.github.com/mkdocstrings/autorefs/issues/35";>#35](https://redirect.github.com/mkdocstrings/autorefs/issues/35";>mkdocstrings/autorefs#35), Co-authored-by: Oleh Prypin mailto:o...@pryp.in";>o...@pryp.in, Co-authored-by: tvdboom mailto:m.524...@gmail.com";>m.524...@gmail.com Preserve HTML data attributes (from spans to anchors) (https://github.com/mkdocstrings/autorefs/commit/0c1781d7e3d6bffd55802868802bcd1ec9e8bbc7";>0c1781d by Timothée Mazzucotelli). [Issue-https://redirect.github.com/mkdocstrings/autorefs/issues/41";>#41](https://redirect.github.com/mkdocstrings/autorefs/issues/41";>mkdocstrings/autorefs#41), [PR-https://redirect.github.com/mkdocstrings/autorefs/issues/42";>#42](https://redirect.github.com/mkdocstrings/autorefs/pull/42";>mkdocstrings/autorefs#42), Co-authored-by: Oleh Prypin mailto:o...@pryp.in";>o...@pryp.in Support [`identifier`][] with pymdownx.inlinehilite enabled (https://github.com/mkdocstrings/autorefs/commit/e7f222894c70627c70e6a14e453a10a81e3f8957";>e7f2228 by Oleh Prypin). [Issue-https://redirect.github.com/mkdocstrings/autorefs/issues/34";>#34](https://redirect.github.com/mkdocstrings/autorefs/issues/34";>mkdocstrings/autorefs#34), [PR-https://redirect.github.com/mkdocstrings/autorefs/issues/40";>#40](https://redirect.github.com/mkdocstrings/autorefs/pull/40";>mkdocstrings/autorefs#40), Co-authored-by: Timothée Mazzucotelli mailto:d...@pawamoy.fr";>d...@pawamoy.fr Bug Fixes Recognize links with multi-line text (https://github.com/mkdocstrings/autorefs/commit/225a6f275069bcdfb3411e80d4a7fa645b857b88";>225a6f2 by Oleh Prypin). [Issue https://redirect.github.com/mkdocstrings/autorefs/issues/31";>#31](https://redirect.github.com/mkdocstrings/autorefs/issues/31";>mkdocstrings/autorefs#31), [PR https://redirect.github.com/mkdocstrings/autorefs/issues/32";>#32](https://redirect.github.com/mkdocstrings/autorefs/pull/32";>mkdocstrings/autorefs#32) Changelog Sourced from https://github.com/mkdocstrings/autorefs/blob/main/CHANGELOG.md";>mkdocs-autorefs's changelog. https://github.com/mkdocstrings/autorefs/releases/tag/1.0.1";>1.0.1 - 2024-02-29 https://github.com/mkdocstrings/autorefs/compare/1.0.0...1.0.1";>Compare with 1.0.0 Bug Fixes Don't import MkDocsConfig (does not exist on MkDocs 1.3-) (https://github.com/mkdocstrings/autorefs/commit/9c156643ead1dc24f08b8047bd5b2fcd97662783";>9c15664 by Timothée Mazzucotelli). https://github.com/mkdocstrings/autorefs/releases/tag/1.0.0";>1.0.0 - 2024-02-27 https://github.com/mkdocstrings/autorefs/compare/0.5.0...1.0.0";>Compare with 0.5.0 Features Add Markdown anchors and aliases (https://github.com/mkdocstrings/autorefs/commit/a215a97a057b54e11ebec8865c64e93429edde63";>a215a97 by Timothée Mazzucotelli). [Replaces-PR-https://redirect.github.com/mkdocstrings/autorefs/issues/20";>#20](https://redirect.github.com/mkdocstrings/autorefs/pull/20";>mkdocstrings/autorefs#20), [Related-to-PR-https://redirect.github.com/mkdocstrings/autorefs/issues/25";>#25](https://redirect.github.com/mkdocstrings/autorefs/pull/25";>mkdocstrings/autorefs#25), [Related-to-issue-https://redirect.github.com/mkdocstrings/autorefs/issues/35";>#35](https://redirect.github.com/mkdocstrings/autorefs/issues/35";>mkdocstrings/autorefs#35), Co-authored-by: Oleh Prypin mailto:o...@pryp.in";>o...@pryp.in, Co-authored-by: tvdboom mailto:m.524...@gmail.com";>m.524...@gmail.com Preserve HTML data attributes (from spans to anchors) (https://github.com/mkdocstrings
[PR] Build: Bump ray from 2.9.2 to 2.12.0 [iceberg-python]
dependabot[bot] opened a new pull request, #672: URL: https://github.com/apache/iceberg-python/pull/672 Bumps [ray](https://github.com/ray-project/ray) from 2.9.2 to 2.12.0. Release notes Sourced from https://github.com/ray-project/ray/releases";>ray's releases. Ray-2.12.0 Ray Libraries Ray Data 🎉 New Features: Store Ray Data logs in special subdirectory (https://redirect.github.com/ray-project/ray/issues/44743";>#44743) 💫 Enhancements: Add in local_read option to from_torch (https://redirect.github.com/ray-project/ray/issues/44752";>#44752) 🔨 Fixes: Fix the config to disable progress bar (https://redirect.github.com/ray-project/ray/issues/44342";>#44342) 📖 Documentation: Clarify deprecated Datasource docstrings (https://redirect.github.com/ray-project/ray/issues/44790";>#44790) Ray Train 🔨 Fixes: Disable gathering the full state dict in RayFSDPStrategy for lightning>2.1 (https://redirect.github.com/ray-project/ray/issues/44569";>#44569) Ray Tune 💫 Enhancements: Remove spammy log for "new output engine" (https://redirect.github.com/ray-project/ray/issues/44824";>#44824) Enable isort (https://redirect.github.com/ray-project/ray/issues/44693";>#44693) Ray Serve 🔨 Fixes: [Serve] fix getting attributes on stdout during Serve logging redirect (https://redirect.github.com/ray-project/ray/pull/44787";>#44787) RLlib 🎉 New Features: Support of images and video logging in WandB (env rendering example script for the new API stack coming up). (https://redirect.github.com/ray-project/ray/pull/43356";>#43356) 💫 Enhancements: Better support and separation-of-concerns for model_config_dict in new API stack. (https://redirect.github.com/ray-project/ray/pull/44263";>#44263) Added example script to pre-train an RLModule in single-agent fashion, then bring checkpoint into multi-agent setup and continue training. (https://redirect.github.com/ray-project/ray/pull/44674";>#44674) More examples scripts got translated from the old- to the new API stack: Curriculum learning, custom-gym-env, etc..: (https://redirect.github.com/ray-project/ray/pull/44706";>#44706, https://redirect.github.com/ray-project/ray/pull/44707";>#44707, https://redirect.github.com/ray-project/ray/pull/44735";>#44735, https://redirect.github.com/ray-project/ray/pull/44841";>#44841) Ray Core and Ray Clusters 🔨 Fixes: ... (truncated) Commits https://github.com/ray-project/ray/commit/549c4b7694483f6bc9e519b61e6f575e13510343";>549c4b7 [release] change version to 2.12.0 https://github.com/ray-project/ray/commit/7c9209fd94726de61f70b0e9ea0f624f2da8a1fd";>7c9209f [ci][bisect/1] initial version of macos test bisect (https://redirect.github.com/ray-project/ray/issues/44618";>#44618) https://github.com/ray-project/ray/commit/fe4dd5da9c8157c44f3d0b4012519dd6a2a243d4";>fe4dd5d [ci][bisect/0] add a function to run a single macos tests (https://redirect.github.com/ray-project/ray/issues/44848";>#44848) https://github.com/ray-project/ray/commit/9587ef35c8bcdcd1f2c1894fe86d9a9b0a56f9f6";>9587ef3 [serve] fix documentation typo (https://redirect.github.com/ray-project/ray/issues/44855";>#44855) https://github.com/ray-project/ray/commit/05232b531de447e8d596c6489309cb5a4251a28e";>05232b5 [Tune] Remove spammy log for "new output engine" (https://redirect.github.com/ray-project/ray/issues/44824";>#44824) https://github.com/ray-project/ray/commit/7e315fe7e5ddc5b9ebec0fc50bd0e1d4fa28d621";>7e315fe [serve] add num replicas auto telemetry (https://redirect.github.com/ray-project/ray/issues/44609";>#44609) https://github.com/ray-project/ray/commit/a2b26631072e7c5869320a65e646384c7c5a5011";>a2b2663 [release] add 2.11.0 performance metrics (https://redirect.github.com/ray-project/ray/issues/44847";>#44847) https://github.com/ray-project/ray/commit/650aa79e3ded090258b9b42187da3a922842561b";>650aa79 [GCS] Fixed GCS pub-sub channel-types to be explicitly mapped to either cappe... https://github.com/ray-project/ray/commit/2185fa9cbd633a5ae20f798e7da044db34335d3b";>2185fa9 [spark] Fix Ray-on-Spark ray child process shebang issue (https://redirect.github.com/ray-project/ray/issues/44827";>#44827) https://github.com/ray-project/ray/commit/1e936e3aeee884b8c2c3f6e3308cfe2449be7384";>1e936e3 [jobs] Fix Job submission issue on Windows platform (https://redirect.github.com/ray-project/ray/issues/44632";>#44632) Additional commits viewable in https://github.com/ray-project/ray/compare/ray-2.9.2...ray-2.12.0";>compare view [](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibili
[PR] Build: Bump mkdocs-material from 9.5.19 to 9.5.20 [iceberg-python]
dependabot[bot] opened a new pull request, #673: URL: https://github.com/apache/iceberg-python/pull/673 Bumps [mkdocs-material](https://github.com/squidfunk/mkdocs-material) from 9.5.19 to 9.5.20. Release notes Sourced from https://github.com/squidfunk/mkdocs-material/releases";>mkdocs-material's releases. mkdocs-material-9.5.20 Fixed deprecation warning in privacy plugin (9.5.19 regression) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7119";>#7119: Tags plugin emits deprecation warning (9.5.19 regression) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7118";>#7118: Social plugin crashes if fonts are disabled (9.5.19 regression) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7085";>#7085: Social plugin crashes on Windows when downloading fonts Changelog Sourced from https://github.com/squidfunk/mkdocs-material/blob/master/CHANGELOG";>mkdocs-material's changelog. mkdocs-material-9.5.20 (2024-04-29) Fixed deprecation warning in privacy plugin (9.5.19 regression) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7119";>#7119: Tags plugin emits deprecation warning (9.5.19 regression) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7118";>#7118: Social plugin crashes if fonts are disabled (9.5.19 regression) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7085";>#7085: Social plugin crashes on Windows when downloading fonts mkdocs-material-9.5.19+insiders-4.53.8 (2024-04-26) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7052";>#7052: Preview extension automatically including all pages Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7051";>#7051: Instant previews mounting on footnote references Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/5165";>#5165: Improved tooltips not mounting in sidebar for typeset plugin mkdocs-material-9.5.19+insiders-4.53.7 (2024-04-25) Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7060";>#7060: Incorrect resolution of translation when using static-i18n mkdocs-material-9.5.19 (2024-04-25) Updated MkDocs to 1.6 and limited version to < 2 Updated Docker image to latest Alpine Linux Removed setup.py, now that GitHub fully understands pyproject.toml Improved interop of social plugin with third-party MkDocs themes Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7099";>#7099: Blog reading time not rendered correctly for Japanese Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7097";>#7097: Improved resilience of tags plugin when no tags are given Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7090";>#7090: Active tab indicator in nested content tabs rendering bug mkdocs-material-9.5.18 (2024-04-16) Refactored tooltips implementation to fix positioning issues Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7044";>#7044: Rendering glitch when hovering contributor avatar in Chrome Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7043";>#7043: Highlighted lines in code blocks cutoff on mobile Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/6910";>#6910: Incorrect position of tooltip for page status in sidebar Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/6760";>#6760: Incorrect position and overly long tooltip in tables Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/6488";>#6488: Incorrect position and cutoff tooltip in content tabs mkdocs-material-9.5.17+insiders-4.53.6 (2024-04-05) Ensure working directory is set for projects when using projects plugin Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/6970";>#6970: Incorrect relative paths in git submodules with projects plugin mkdocs-material-9.5.17+insiders-4.53.5 (2024-04-02) Fixed social plugin crashing when no colors are specified in palettes mkdocs-material-9.5.17 (2024-04-02) Updated Serbian translations Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7003";>#7003: Confusing keyboard interaction for palette toggle Fixed https://redirect.github.com/squidfunk/mkdocs-material/issues/7001";>#7001: Blog posts now show time by default (9.5.16 regression) ... (truncated) Commits https://github.com/squidfunk/mkdocs-material/commit/5cb3117f50c78abfef6817b72cb09910decd272b";>5cb3117 Prepare 9.5.20 release https://github.com/squidfunk/mkdocs-material/commit/47527797b576b64161dcb066e86abbef3f38df3d";>4752779 Updated dependencies https://github.com/squidfunk/mkdocs-material/commit/e90871f210b5d281da5c238b9e67917fe9222585";>e90871f Fixed social plugin crashing on Windows whe
Re: [I] Iceberg supports Tencent COS [iceberg]
github-actions[bot] commented on issue #2498: URL: https://github.com/apache/iceberg/issues/2498#issuecomment-2083906772 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Flink CDC | OOM during initial snapshot [iceberg]
github-actions[bot] closed issue #2504: Flink CDC | OOM during initial snapshot URL: https://github.com/apache/iceberg/issues/2504 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Stack map does not match the one at exception handler [iceberg]
github-actions[bot] commented on issue #2507: URL: https://github.com/apache/iceberg/issues/2507#issuecomment-2083906823 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Partitioning on sensitive (encrypted) columns [iceberg]
github-actions[bot] closed issue #2513: Partitioning on sensitive (encrypted) columns URL: https://github.com/apache/iceberg/issues/2513 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Iceberg supports Tencent COS [iceberg]
github-actions[bot] closed issue #2498: Iceberg supports Tencent COS URL: https://github.com/apache/iceberg/issues/2498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Predicate pushdown not visible in Spark plan [iceberg]
github-actions[bot] commented on issue #2517: URL: https://github.com/apache/iceberg/issues/2517#issuecomment-2083906854 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Predicate pushdown not visible in Spark plan [iceberg]
github-actions[bot] closed issue #2517: Predicate pushdown not visible in Spark plan URL: https://github.com/apache/iceberg/issues/2517 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] The spark's remove_orphan_files procedure cannot expire the orphan files that located in remote object storage services [iceberg]
github-actions[bot] commented on issue #2525: URL: https://github.com/apache/iceberg/issues/2525#issuecomment-2083906870 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] The spark's remove_orphan_files procedure cannot expire the orphan files that located in remote object storage services [iceberg]
github-actions[bot] closed issue #2525: The spark's remove_orphan_files procedure cannot expire the orphan files that located in remote object storage services URL: https://github.com/apache/iceberg/issues/2525 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Support partitioned writes [iceberg-python]
jqin61 commented on issue #208: URL: https://github.com/apache/iceberg-python/issues/208#issuecomment-2083999610 Updates for monthly sync: 1. Working on dynamic overwrite which gets unblocked by partial deletes https://github.com/apache/iceberg-python/pull/569 2. For transforms functions, we could convert the arrow column to a Python list and feed that to the transform function to generate transformed pyarrow columns for grouping partitions using existing algorithm. But there is efficiency concerns since the transform function can only take Python data types and we have to convert between arrow, python and back to arrow. Also, the types in arrow and iceberg are quite different and sometimes we need to call some utility functions. For example, timestamp is converted to datetime in Python, and we have to call an existing utility function to convert it to micros(int) before feeding it into transform functions. Another option is to create an Arrow UDF for the partition transforms which might parallelize better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Metadata Log Entries metadata table [iceberg-python]
kevinjqliu commented on code in PR #667: URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1584036888 ## pyiceberg/table/__init__.py: ## @@ -3537,6 +3537,39 @@ def update_partitions_map( schema=table_schema, ) +def metadata_log_entries(self) -> "pa.Table": +import pyarrow as pa + +from pyiceberg.table.snapshots import MetadataLogEntry + +table_schema = pa.schema([ +("timestamp", pa.timestamp(unit='ms'), True), +("file", pa.string(), True), Review Comment: ah, good catch. `timestamp` and `file` should both be required fields, according to the [Java schema](https://github.com/apache/iceberg/blob/1e35bf96ecacd5c5175116f40fa3e097991d04d2/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L30-L35). The third element of the tuple represents [`nullable`](https://arrow.apache.org/docs/python/generated/pyarrow.schema.html), which should be `False` for both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Metadata Log Entries metadata table [iceberg-python]
kevinjqliu commented on code in PR #667: URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1584037066 ## pyiceberg/table/__init__.py: ## @@ -3537,6 +3537,39 @@ def update_partitions_map( schema=table_schema, ) +def metadata_log_entries(self) -> "pa.Table": +import pyarrow as pa + +from pyiceberg.table.snapshots import MetadataLogEntry + +table_schema = pa.schema([ +("timestamp", pa.timestamp(unit='ms'), True), +("file", pa.string(), True), Review Comment: This means the rest of the field's nullable fields are also wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Metadata Log Entries metadata table [iceberg-python]
kevinjqliu commented on code in PR #667: URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1584040866 ## pyiceberg/table/metadata.py: ## @@ -292,6 +292,13 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None +def _snapshot_as_of_timestamp_ms(self, timestamp_ms: int) -> Optional[Snapshot]: Review Comment: Getting a snapshot by timestamp should be a public function, I'm not opposed to making this public. But I'm unsure if `timestamp_ms: int` is the preferred signature we want as input. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Retry connections in JDBC catalog with user configured error code list [iceberg]
amogh-jahagirdar commented on code in PR #10140: URL: https://github.com/apache/iceberg/pull/10140#discussion_r1584047406 ## core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestClientPoolImpl { + + static class RetryableException extends RuntimeException {} + + static class NonRetryableException extends RuntimeException {} + + static class MockClient { +boolean closed = false; + +int actions = 0; + +int retryableFailures = 0; + +public void close() { + closed = true; +} + +public int successfulAction() { + actions++; + return actions; +} + +int succeedAfter(int succeedAfterAttempts) { + if (retryableFailures == succeedAfterAttempts - 1) { +return successfulAction(); + } + + retryableFailures++; + throw new RetryableException(); +} + +int failWithNonRetryable() { + throw new NonRetryableException(); +} + } + + static class MockClientPoolImpl extends ClientPoolImpl { + +private int reconnectionAttempts; + +MockClientPoolImpl( +int poolSize, +Class reconnectExc, +boolean retryByDefault, +int numRetries) { + super(poolSize, reconnectExc, retryByDefault, numRetries); +} + +@Override +protected MockClient newClient() { + return new MockClient(); +} + +@Override +protected MockClient reconnect(MockClient client) { + reconnectionAttempts++; + return client; +} + +@Override +protected void close(MockClient client) { + client.close(); +} + +int reconnectionAttempts() { + return reconnectionAttempts; +} + } + + @Test Review Comment: Good suggestion, I agreed the tests are more readable that way! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Retry connections in JDBC catalog with user configured error code list [iceberg]
amogh-jahagirdar commented on code in PR #10140: URL: https://github.com/apache/iceberg/pull/10140#discussion_r1584048101 ## core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestClientPoolImpl { + + static class RetryableException extends RuntimeException {} + + static class NonRetryableException extends RuntimeException {} + + static class MockClient { +boolean closed = false; + +int actions = 0; + +int retryableFailures = 0; + +public void close() { + closed = true; +} + +public int successfulAction() { + actions++; + return actions; +} + +int succeedAfter(int succeedAfterAttempts) { + if (retryableFailures == succeedAfterAttempts - 1) { +return successfulAction(); + } + + retryableFailures++; + throw new RetryableException(); +} + +int failWithNonRetryable() { + throw new NonRetryableException(); +} + } + + static class MockClientPoolImpl extends ClientPoolImpl { + +private int reconnectionAttempts; + +MockClientPoolImpl( +int poolSize, +Class reconnectExc, +boolean retryByDefault, +int numRetries) { + super(poolSize, reconnectExc, retryByDefault, numRetries); +} + +@Override +protected MockClient newClient() { + return new MockClient(); +} + +@Override +protected MockClient reconnect(MockClient client) { + reconnectionAttempts++; + return client; +} + +@Override +protected void close(MockClient client) { + client.close(); +} + +int reconnectionAttempts() { + return reconnectionAttempts; +} + } + + @Test + public void testRetrySucceedsWithinMaxAttempts() throws Exception { +int maxRetries = 5; +int succeedAfterAttempts = 3; +try (MockClientPoolImpl mockClientPool = +new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) { + int actions = mockClientPool.run(client -> client.succeedAfter(succeedAfterAttempts)); + assertThat(actions) + .as("There should be exactly one successful action invocation") + .isEqualTo(1); + assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts - 1); +} + } + + @Test + public void testRetriesExhaustedAndSurfacesFailure() { +int maxRetries = 3; +int succeedAfterAttempts = 5; +MockClientPoolImpl mockClientPool = Review Comment: Yes we should. There's no real resources being allocated for the mock stuff but it's still best practice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Add `table_exists` method to the Catalog [iceberg-python]
djouallah commented on issue #507: URL: https://github.com/apache/iceberg-python/issues/507#issuecomment-2084257385 @kevinjqliu is this supported ? `AttributeError: 'SqlCatalog' object has no attribute 'table_exists'` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Documentation page returning 404 [iceberg]
manuzhang commented on issue #10249: URL: https://github.com/apache/iceberg/issues/10249#issuecomment-2084280354 We are still fixing doc links in previous releases tracked at #10116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Getting storage partitioned join to work [iceberg]
mrbrahman commented on issue #10250: URL: https://github.com/apache/iceberg/issues/10250#issuecomment-2084281224 The problem was the commented out parameter. Apparently I had to set it thus: ~~~sql set `spark.sql.iceberg.planning.preserve-data-grouping` = true; ~~~ Once that was set, Spark nicely avoided the shuffle! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] ValueError: Mismatch in fields: ? [iceberg-python]
djouallah commented on issue #674: URL: https://github.com/apache/iceberg-python/issues/674#issuecomment-2084298893 sorry, may fault -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] HiveMetaHook implementation to enable CREATE TABLE and DROP TABLE from Hive queries [iceberg]
shivjha30 commented on code in PR #1495: URL: https://github.com/apache/iceberg/pull/1495#discussion_r1584078574 ## mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Properties; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveIcebergMetaHook implements HiveMetaHook { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); + private static final Set PARAMETERS_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, Catalogs.LOCATION, Catalogs.NAME); + private static final Set PROPERTIES_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.EXTERNAL_TABLE_PURGE, hive_metastoreConstants.META_TABLE_STORAGE, "EXTERNAL", + "bucketing_version"); + + private final Configuration conf; + private Table icebergTable = null; + private Properties catalogProperties; + private boolean deleteIcebergTable; + private FileIO deleteIo; + private TableMetadata deleteMetadata; + + public HiveIcebergMetaHook(Configuration conf) { +this.conf = conf; + } + + @Override + public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { +this.catalogProperties = getCatalogProperties(hmsTable); + +// Set the table type even for non HiveCatalog based tables +hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, +BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()); + +if (!Catalogs.hiveCatalog(conf)) { + // If not using HiveCatalog check for existing table + try { +this.icebergTable = Catalogs.loadTable(conf, catalogProperties); + + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null, +"Iceberg table already created - can not use provided schema"); + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC) == null, +"Iceberg table already created - can not use provided partition specification"); + +LOG.info("Iceberg table already exists {}", icebergTable); + +return; + } catch (NoSuchTableException nte) { +// If the table does not exist we will create it below + } +} + +// If the table does not exist collect data for table creation +String schemaString = catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA); +Preconditions.checkNotNull(schemaString, "Please provide a table schema"); +// Just check if it is parsable, and later use for partition specification parsing +Schema schema = SchemaParser.fromJson(schemaString); + +String specString = catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC); +if (specString != null) { + // Just check if it is parsable + PartitionSpecParser.fromJson(schema, specString); +} + +// Allow purging table data if the table is created now and not set otherwise +if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE) == null) { + hmsTable.getPar
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584098222 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { +match self.filter { +Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), +None => Ok(None), +} +} + +/// Creates a reference-counted [`PartitionSpec`] and a +/// corresponding schema based on the specified partition spec id. +fn create_partition_spec_and_schema( +&self, +spec_id: i32, +) -> Result<(&PartitionSpecRef, SchemaRef)> { Review Comment: > Could we also do this up-front in `new` or just once I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec, unless we don't actually need the entry.partition_spec_id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { +match self.filter { +Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), +None => Ok(None), +} +} + +/// Creates a reference-counted [`PartitionSpec`] and a +/// corresponding schema based on the specified partition spec id. +fn create_partition_spec_and_schema( +&self, +spec_id: i32, +) -> Result<(&PartitionSpecRef, SchemaRef)> { Review Comment: > Could we also do this up-front in `new` or just once I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec, unless we don't actually need the entry.partition_spec_id. As of right now, we only use it to make the runtime check in manifest evaluator if entry.partition_spec_id and partition_schema.schema_id() match. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { +match self.filter { +Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), +None => Ok(None), +} +} + +/// Creates a reference-counted [`PartitionSpec`] and a +/// corresponding schema based on the specified partition spec id. +fn create_partition_spec_and_schema( +&self, +spec_id: i32, +) -> Result<(&PartitionSpecRef, SchemaRef)> { Review Comment: > Could we also do this up-front in `new` or just once I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec and the partition_schema? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { +match self.filter { +Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), +None => Ok(None), +} +} + +/// Creates a reference-counted [`PartitionSpec`] and a +/// corresponding schema based on the specified partition spec id. +fn create_partition_spec_and_schema( +&self, +spec_id: i32, +) -> Result<(&PartitionSpecRef, SchemaRef)> { Review Comment: > Could we also do this up-front in `new` or just once I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec and the partition_schema? Fixed the return type `PartitionSpecRef` though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584111575 ## crates/iceberg/src/expr/visitors/manifest_evaluator.rs: ## @@ -16,74 +16,49 @@ // under the License. use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; -use crate::expr::visitors::inclusive_projection::InclusiveProjection; -use crate::expr::{Bind, BoundPredicate, BoundReference}; -use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef}; -use crate::{Error, ErrorKind}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{Datum, FieldSummary, ManifestFile}; +use crate::{Error, ErrorKind, Result}; use fnv::FnvHashSet; -use std::sync::Arc; -/// Evaluates [`ManifestFile`]s to see if their partition summary matches a provided -/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of [`ManifestFile`]s +#[derive(Debug)] +/// Evaluates a [`ManifestFile`] to see if the partition summaries +/// match a provided [`BoundPredicate`]. +/// +/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s /// in which data might be found that matches the TableScan's filter. pub(crate) struct ManifestEvaluator { -partition_schema: SchemaRef, +partition_schema_id: i32, partition_filter: BoundPredicate, case_sensitive: bool, } impl ManifestEvaluator { pub(crate) fn new( -partition_spec: PartitionSpecRef, -table_schema: SchemaRef, -filter: BoundPredicate, +partition_schema_id: i32, Review Comment: I like the idea of removing the runtime check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584112041 ## crates/iceberg/src/scan.rs: ## @@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> { } /// Build the table scan. -pub fn build(self) -> crate::Result { +pub fn build(self) -> Result { Review Comment: @liurenjie1024 what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584120346 ## crates/iceberg/src/scan.rs: ## @@ -169,55 +177,66 @@ pub struct TableScan { filter: Option>, } -/// A stream of [`FileScanTask`]. -pub type FileScanTaskStream = BoxStream<'static, crate::Result>; - impl TableScan { -/// Returns a stream of file scan tasks. - -pub async fn plan_files(&self) -> crate::Result { -// Cache `ManifestEvaluatorFactory`s created as part of this scan -let mut manifest_evaluator_cache: HashMap = HashMap::new(); - -// these variables needed to ensure that we don't need to pass a -// reference to self into `try_stream`, as it expects references -// passed in to outlive 'static -let schema = self.schema.clone(); -let snapshot = self.snapshot.clone(); -let table_metadata = self.table_metadata.clone(); -let file_io = self.file_io.clone(); -let case_sensitive = self.case_sensitive; -let filter = self.filter.clone(); +/// Returns a stream of [`FileScanTask`]s. +pub async fn plan_files(&self) -> Result { +let context = FileScanStreamContext::new( +self.schema.clone(), +self.snapshot.clone(), +self.table_metadata.clone(), +self.file_io.clone(), +self.filter.clone(), +self.case_sensitive, +); + +let bound_filter = context.bound_filter()?; + +let mut partition_filter_cache = PartitionFilterCache::new(); +let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); Ok(try_stream! { -let manifest_list = snapshot -.clone() -.load_manifest_list(&file_io, &table_metadata) -.await?; +let manifest_list = context +.snapshot +.load_manifest_list(&context.file_io, &context.table_metadata) +.await?; -// Generate data file stream for entry in manifest_list.entries() { -// If this scan has a filter, check the partition evaluator cache for an existing -// PartitionEvaluator that matches this manifest's partition spec ID. -// Use one from the cache if there is one. If not, create one, put it in -// the cache, and take a reference to it. -#[allow(clippy::map_entry)] -if let Some(filter) = filter.as_ref() { -if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) { - manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?); -} -let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id]; +if let Some(filter) = &bound_filter { Review Comment: should be done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]
marvinlanhenke commented on code in PR #360: URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390 ## crates/iceberg/src/scan.rs: ## @@ -314,6 +312,140 @@ impl TableScan { } } +#[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. +struct FileScanStreamContext { +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +} + +impl FileScanStreamContext { +/// Creates a new [`FileScanStreamContext`]. +fn new( +schema: SchemaRef, +snapshot: SnapshotRef, +table_metadata: TableMetadataRef, +file_io: FileIO, +filter: Option>, +case_sensitive: bool, +) -> Self { +Self { +schema, +snapshot, +table_metadata, +file_io, +filter, +case_sensitive, +} +} + +/// Creates a [`BoundPredicate`] from row filter [`Predicate`]. +fn bound_filter(&self) -> Result> { +match self.filter { +Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), +None => Ok(None), +} +} + +/// Creates a reference-counted [`PartitionSpec`] and a +/// corresponding schema based on the specified partition spec id. +fn create_partition_spec_and_schema( +&self, +spec_id: i32, +) -> Result<(&PartitionSpecRef, SchemaRef)> { Review Comment: > Could we also do this up-front in `new` or just once I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec and the partition_schema, this way. However, it makes sense to just create a "new" partition_schema if we have a "new" spec_id. So I'll see where we can move the logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add ManifestFile Stats in snapshot summary. [iceberg]
nk1506 commented on code in PR #10246: URL: https://github.com/apache/iceberg/pull/10246#discussion_r1584136686 ## core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java: ## @@ -190,6 +190,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { List apply = Lists.newArrayList(); Iterables.addAll(apply, newManifestsWithMetadata); apply.addAll(keptManifests); +apply.forEach(summaryBuilder::addedManifestStats); Review Comment: Add manifestStats with any newManifest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add ManifestFile Stats in snapshot summary. [iceberg]
nk1506 commented on code in PR #10246: URL: https://github.com/apache/iceberg/pull/10246#discussion_r1584137262 ## core/src/main/java/org/apache/iceberg/SnapshotSummary.java: ## @@ -263,6 +273,12 @@ void addTo(ImmutableMap.Builder builder) { setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles); setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords); setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords); + setIf(totalDataManifestFiles > 0, builder, TOTAL_DATA_MANIFEST_FILES, totalDataManifestFiles); Review Comment: If there are zero counts these stats will be not added to summary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] HiveMetaHook implementation to enable CREATE TABLE and DROP TABLE from Hive queries [iceberg]
pvary commented on code in PR #1495: URL: https://github.com/apache/iceberg/pull/1495#discussion_r1584145169 ## mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Properties; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveIcebergMetaHook implements HiveMetaHook { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); + private static final Set PARAMETERS_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, Catalogs.LOCATION, Catalogs.NAME); + private static final Set PROPERTIES_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.EXTERNAL_TABLE_PURGE, hive_metastoreConstants.META_TABLE_STORAGE, "EXTERNAL", + "bucketing_version"); + + private final Configuration conf; + private Table icebergTable = null; + private Properties catalogProperties; + private boolean deleteIcebergTable; + private FileIO deleteIo; + private TableMetadata deleteMetadata; + + public HiveIcebergMetaHook(Configuration conf) { +this.conf = conf; + } + + @Override + public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { +this.catalogProperties = getCatalogProperties(hmsTable); + +// Set the table type even for non HiveCatalog based tables +hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, +BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()); + +if (!Catalogs.hiveCatalog(conf)) { + // If not using HiveCatalog check for existing table + try { +this.icebergTable = Catalogs.loadTable(conf, catalogProperties); + + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null, +"Iceberg table already created - can not use provided schema"); + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC) == null, +"Iceberg table already created - can not use provided partition specification"); + +LOG.info("Iceberg table already exists {}", icebergTable); + +return; + } catch (NoSuchTableException nte) { +// If the table does not exist we will create it below + } +} + +// If the table does not exist collect data for table creation +String schemaString = catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA); +Preconditions.checkNotNull(schemaString, "Please provide a table schema"); +// Just check if it is parsable, and later use for partition specification parsing +Schema schema = SchemaParser.fromJson(schemaString); + +String specString = catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC); +if (specString != null) { + // Just check if it is parsable + PartitionSpecParser.fromJson(schema, specString); +} + +// Allow purging table data if the table is created now and not set otherwise +if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE) == null) { + hmsTable.getParamet
Re: [PR] #9073 Junit 4 tests switched to JUnit 5 [iceberg]
igoradulian commented on PR #9793: URL: https://github.com/apache/iceberg/pull/9793#issuecomment-2084436773 @nastra, please review recent changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Nessie: Make handleExceptionsForCommits public in NessieUtil [iceberg]
nastra commented on PR #10248: URL: https://github.com/apache/iceberg/pull/10248#issuecomment-2084445178 @YuzongG just curious, where are you planning to re-use this as this should only be used internally in `NessieTableOperations` / `NessieViewOperations` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] appending to a table with Decimal > 32767 results in `int too big to convert` [iceberg-python]
bigluck commented on issue #669: URL: https://github.com/apache/iceberg-python/issues/669#issuecomment-2084469240 For reference, this is the full stack trace: ``` Traceback (most recent call last): File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/test.py", line 31, in new_table.append(arrow_table) File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 1000, in append for data_file in data_files: File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 2345, in _dataframe_to_data_files yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) ^^^ File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1758, in write_file fill_parquet_file_metadata( File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1695, in fill_parquet_file_metadata _min = agg.min_as_bytes() ^^ File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1355, in min_as_bytes return self.serialize( ^^^ File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1337, in serialize return to_bytes(self.primitive_type, value) File "/nix/store/03q8gn91mj95y5bqbcl90hyvmpqpz738-python3-3.11.7/lib/python3.11/functools.py", line 909, in wrapper return dispatch(args[0].__class__)(*args, **kw) File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/conversions.py", line 267, in _ return decimal_to_bytes(value) ^^^ File "/Users/bigluck/Desktop/pyiceberg-vlad-bug/venv/lib/python3.11/site-packages/pyiceberg/utils/decimal.py", line 81, in decimal_to_bytes return unscaled_value.to_bytes(byte_length, byteorder="big", signed=True) ^^ OverflowError: int too big to convert (venv) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] feat: Convert predicate to arrow filter and push down to parquet reader [iceberg-rust]
liurenjie1024 commented on code in PR #295: URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1584204040 ## crates/iceberg/src/arrow/reader.rs: ## @@ -186,4 +221,637 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + +fn get_row_filter( +&self, +parquet_schema: &SchemaDescriptor, +collector: &CollectFieldIdVisitor, +) -> Result> { +if let Some(predicates) = &self.predicates { +let field_id_map = build_field_id_map(parquet_schema)?; + +let column_indices = collector +.field_ids +.iter() +.map(|field_id| { +field_id_map.get(field_id).cloned().ok_or_else(|| { +Error::new(ErrorKind::DataInvalid, "Field id not found in schema") +}) +}) +.collect::>>()?; + +// Convert BoundPredicates to ArrowPredicates +let mut converter = PredicateConverter { +columns: &column_indices, +projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), +parquet_schema, +column_map: &field_id_map, +}; +let arrow_predicate = visit(&mut converter, predicates)?; +Ok(Some(RowFilter::new(vec![arrow_predicate]))) +} else { +Ok(None) +} +} +} + +/// Build the map of field id to Parquet column index in the schema. +fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { +let mut column_map = HashMap::new(); +for (idx, field) in parquet_schema.columns().iter().enumerate() { +let field_type = field.self_type(); +match field_type { +ParquetType::PrimitiveType { basic_info, .. } => { +if !basic_info.has_id() { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column {:?} in schema doesn't have field id", +field_type +), +)); +} +column_map.insert(basic_info.id(), idx); +} +ParquetType::GroupType { .. } => { +return Err(Error::new( +ErrorKind::DataInvalid, +format!( +"Leave column in schema should be primitive type but got {:?}", +field_type +), +)); +} +}; +} + +Ok(column_map) +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { +field_ids: Vec, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { +type T = (); + +fn always_true(&mut self) -> Result { +Ok(()) +} + +fn always_false(&mut self) -> Result { +Ok(()) +} + +fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { +Ok(()) +} + +fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { +Ok(()) +} + +fn not(&mut self, _inner: Self::T) -> Result { +Ok(()) +} + +fn is_null( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn not_null( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn is_nan( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn not_nan( +&mut self, +reference: &BoundReference, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn less_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn less_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn greater_than( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &BoundPredicate, +) -> Result { +self.field_ids.push(reference.field().id); +Ok(()) +} + +fn greater_than_or_eq( +&mut self, +reference: &BoundReference, +_literal: &Datum, +_predicate: &
Re: [I] appending to a table with Decimal > 32767 results in `int too big to convert` [iceberg-python]
bigluck commented on issue #669: URL: https://github.com/apache/iceberg-python/issues/669#issuecomment-2084497090 @Fokko `decimal_to_bytes`, when invoked without `byte_length`, uses `bytes_required` to get the required number of bytes. ```python for v in ['32767', '32768', '32769', '32999']: d = Decimal(v) print(f'decimal_to_unscaled[{v}]', decimal_to_unscaled(d)) x = decimal_to_unscaled(d) print(f'bytes_required[{v}]', bytes_required(d)) ``` ``` decimal_to_unscaled[32767] 32767 bytes_required[32767] 2 decimal_to_unscaled[32768] 32768 bytes_required[32768] 2 decimal_to_unscaled[32769] 32769 bytes_required[32769] 2 decimal_to_unscaled[32999] 32999 bytes_required[32999] 2 ``` But it overflows because `decimal_to_bytes` creates a signed value: ```python unscaled_value = decimal_to_unscaled(value) if byte_length is None: byte_length = bytes_required(unscaled_value) return unscaled_value.to_bytes(byte_length, byteorder="big", signed=True) ``` 2 bytes, as a signed int, can store values in the `-32,768` to `32,767` range. Indeed the same code does not crash when using `Decimal ('32967')` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Basic Integration with Datafusion [iceberg-rust]
Xuanwo commented on code in PR #324: URL: https://github.com/apache/iceberg-rust/pull/324#discussion_r1584213613 ## crates/integrations/datafusion/src/catalog.rs: ## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, sync::Arc}; + +use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; +use futures::future::try_join_all; +use iceberg::{Catalog, NamespaceIdent, Result}; + +use crate::schema::IcebergSchemaProvider; + +/// Provides an interface to manage and access multiple schemas +/// within an Iceberg [`Catalog`]. +/// +/// Acts as a centralized catalog provider that aggregates +/// multiple [`SchemaProvider`], each associated with distinct namespaces. +pub struct IcebergCatalogProvider { +/// A concurrent `HashMap` where keys are namespace names +/// and values are dynamic references to objects implementing the +/// [`SchemaProvider`] trait. +schemas: HashMap>, +} + +impl IcebergCatalogProvider { +/// Asynchronously tries to construct a new [`IcebergCatalogProvider`] +/// using the given client to fetch and initialize schema providers for +/// each namespace in the Iceberg [`Catalog`]. +/// +/// This method retrieves the list of namespace names +/// attempts to create a schema provider for each namespace, and +/// collects these providers into a concurrent `HashMap`. +pub async fn try_new(client: Arc) -> Result { +let schema_names: Vec<_> = client +.list_namespaces(None) +.await? +.iter() +.flat_map(|ns| ns.as_ref().clone()) +.collect(); + +let providers = try_join_all( +schema_names +.iter() +.map(|name| { +IcebergSchemaProvider::try_new( +client.clone(), +NamespaceIdent::new(name.clone()), +) +}) +.collect::>(), +) +.await?; + +let schemas: Vec<_> = schema_names +.into_iter() +.zip(providers.into_iter()) +.map(|(name, provider)| { +let provider = Arc::new(provider) as Arc; +(name, provider) +}) +.collect(); + +Ok(IcebergCatalogProvider { +schemas: schemas.into_iter().collect(), +}) +} +} + +impl CatalogProvider for IcebergCatalogProvider { +fn as_any(&self) -> &dyn Any { +self +} + +fn schema_names(&self) -> Vec { +self.schemas.keys().cloned().collect() Review Comment: > It's possible to block on an async call, but it requires runtime api. It does possible but we should avoid using `rt.block_on()` in our lib. > My suggestion is to leave it as it now, and fix it when we have runtime api landed. I'm second with this suggestion. We can move on and figure how to improve this part in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Basic Integration with Datafusion [iceberg-rust]
liurenjie1024 commented on code in PR #324: URL: https://github.com/apache/iceberg-rust/pull/324#discussion_r1584229759 ## crates/integrations/datafusion/src/catalog.rs: ## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, sync::Arc}; + +use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; +use futures::future::try_join_all; +use iceberg::{Catalog, NamespaceIdent, Result}; + +use crate::schema::IcebergSchemaProvider; + +/// Provides an interface to manage and access multiple schemas +/// within an Iceberg [`Catalog`]. +/// +/// Acts as a centralized catalog provider that aggregates +/// multiple [`SchemaProvider`], each associated with distinct namespaces. +pub struct IcebergCatalogProvider { +/// A concurrent `HashMap` where keys are namespace names +/// and values are dynamic references to objects implementing the +/// [`SchemaProvider`] trait. +schemas: HashMap>, +} + +impl IcebergCatalogProvider { +/// Asynchronously tries to construct a new [`IcebergCatalogProvider`] +/// using the given client to fetch and initialize schema providers for +/// each namespace in the Iceberg [`Catalog`]. +/// +/// This method retrieves the list of namespace names +/// attempts to create a schema provider for each namespace, and +/// collects these providers into a concurrent `HashMap`. +pub async fn try_new(client: Arc) -> Result { +let schema_names: Vec<_> = client +.list_namespaces(None) +.await? +.iter() +.flat_map(|ns| ns.as_ref().clone()) +.collect(); + +let providers = try_join_all( +schema_names +.iter() +.map(|name| { +IcebergSchemaProvider::try_new( +client.clone(), +NamespaceIdent::new(name.clone()), +) +}) +.collect::>(), +) +.await?; + +let schemas: Vec<_> = schema_names +.into_iter() +.zip(providers.into_iter()) +.map(|(name, provider)| { +let provider = Arc::new(provider) as Arc; +(name, provider) +}) +.collect(); + +Ok(IcebergCatalogProvider { +schemas: schemas.into_iter().collect(), +}) +} +} + +impl CatalogProvider for IcebergCatalogProvider { +fn as_any(&self) -> &dyn Any { +self +} + +fn schema_names(&self) -> Vec { +self.schemas.keys().cloned().collect() Review Comment: > is there any issue or discussion already related to that? Or would you mind to quickly outline whats the plan here? There is a tracking issue to add runtime api: https://github.com/apache/iceberg-rust/issues/124 But I agree with @Xuanwo that we should avoid `rt.block_on` as much as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org