[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #6335: Core: Avoid generating a large ManifestFile when committing
ConeyLiu commented on code in PR #6335: URL: https://github.com/apache/iceberg/pull/6335#discussion_r1063976959 ## core/src/main/java/org/apache/iceberg/FastAppend.java: ## @@ -49,8 +51,9 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final List newFiles = Lists.newArrayList(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); - private ManifestFile newManifest = null; + private List cachedNewManifests = null; Review Comment: @RussellSpitzer Do you mean the `caching` part here? Here just rename the variable because it is plural now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening
szehon-ho commented on code in PR #6451: URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982 ## docs/configuration.md: ## @@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They are used by some catalo The following properties from the Hadoop configuration are used by the Hive Metastore connector. -| Property | Default | Description | -| - | | -- | -| iceberg.hive.client-pool-size | 5| The size of the Hive client pool when tracking tables in HMS | -| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | -| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in milliseconds to check back on the status of lock acquisition | -| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in milliseconds to check back on the status of lock acquisition | - -Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout) +| Property | Default | Description | +|---|-|--| +| iceberg.hive.client-pool-size | 5 | The size of the Hive client pool when tracking tables in HMS | +| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | Review Comment: I see, just saw its from unrelated changelist. But it still doesnt fit very well with the two phases (creation/acquisition) you put in the first part of the doc, should we just say simply: The HMS table locking is a 2-step process: 1. Lock create: Create lock in HMS and queue for acquisition 2. Lock check: Check if lock successfully acquired This way, they are more relevant to the property name. Also wasnt sure about putting the details of 'should wait until every previously created Lock is released', is this the Hive internals? As I feel it will just confuse more. ## docs/configuration.md: ## @@ -159,15 +159,24 @@ Here are the catalog properties related to locking. They are used by some catalo ## Hadoop configuration The following properties from the Hadoop configuration are used by the Hive Metastore connector. - -| Property | Default | Description | -| - | | -- | -| iceberg.hive.client-pool-size | 5| The size of the Hive client pool when tracking tables in HMS | -| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | -| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in milliseconds to check back on the status of lock acquisition | -| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in milliseconds to check back on the status of lock acquisition | - -Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout) +The HMS table locking is a 2-step process: +- Lock creation - the Lock itself is created and queued +- Lock acquisition - the queued Lock should wait until every previously created Lock is released + +| Property | Default | Description | +|---|-|--| +| iceberg.hive.client-pool-size | 5 | The size of the Hive client pool when tracking tables in HMS | +| iceberg.hive.lock-creation-timeout-ms | 18 (3 min) | Maximum time in milliseconds to create a lock in the HMS | +| iceberg.hive.lock-creation-min-wait-ms| 50 | Minimum time in milliseconds to check the lock creation in the HMS | Review Comment:
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening
szehon-ho commented on code in PR #6451: URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982 ## docs/configuration.md: ## @@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They are used by some catalo The following properties from the Hadoop configuration are used by the Hive Metastore connector. -| Property | Default | Description | -| - | | -- | -| iceberg.hive.client-pool-size | 5| The size of the Hive client pool when tracking tables in HMS | -| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | -| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in milliseconds to check back on the status of lock acquisition | -| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in milliseconds to check back on the status of lock acquisition | - -Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout) +| Property | Default | Description | +|---|-|--| +| iceberg.hive.client-pool-size | 5 | The size of the Hive client pool when tracking tables in HMS | +| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | Review Comment: I see, just saw its from another pr and unrelated to this one. But it still doesnt fit very well with the two phases (creation/acquisition) you put in the first part of the doc, should we just say simply: The HMS table locking is a 2-step process: 1. Lock create: Create lock in HMS and queue for acquisition 2. Lock check: Check if lock successfully acquired This way, they are more relevant to the property name. Also wasnt sure about putting the details of 'should wait until every previously created Lock is released', is this the Hive internals? As I feel it will just confuse more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening
szehon-ho commented on code in PR #6451: URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982 ## docs/configuration.md: ## @@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They are used by some catalo The following properties from the Hadoop configuration are used by the Hive Metastore connector. -| Property | Default | Description | -| - | | -- | -| iceberg.hive.client-pool-size | 5| The size of the Hive client pool when tracking tables in HMS | -| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | -| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in milliseconds to check back on the status of lock acquisition | -| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in milliseconds to check back on the status of lock acquisition | - -Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout) +| Property | Default | Description | +|---|-|--| +| iceberg.hive.client-pool-size | 5 | The size of the Hive client pool when tracking tables in HMS | +| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | Review Comment: I see, just saw its from another pr and unrelated to this one. But it still doesnt fit very well with the two phases (creation/acquisition) you put in the first part of the doc, should we just say simply: The HMS table locking is a 2-step process: 1. Lock create: Create lock in HMS and queue for acquisition 2. Lock check: Check if lock successfully acquired This way, they are more relevant to the property names (lock-create, lock-check) Also wasnt sure about putting the details of 'should wait until every previously created Lock is released', is this the Hive internals? As I feel it will just confuse more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening
szehon-ho commented on code in PR #6451: URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982 ## docs/configuration.md: ## @@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They are used by some catalo The following properties from the Hadoop configuration are used by the Hive Metastore connector. -| Property | Default | Description | -| - | | -- | -| iceberg.hive.client-pool-size | 5| The size of the Hive client pool when tracking tables in HMS | -| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | -| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in milliseconds to check back on the status of lock acquisition | -| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in milliseconds to check back on the status of lock acquisition | - -Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout) +| Property | Default | Description | +|---|-|--| +| iceberg.hive.client-pool-size | 5 | The size of the Hive client pool when tracking tables in HMS | +| iceberg.hive.lock-timeout-ms | 18 (3 min) | Maximum time in milliseconds to acquire a lock | Review Comment: I see, just saw its from another pr and unrelated to this one. But it still doesnt fit very well with the two phases (creation/acquisition) you put in the first part of the doc, should we just say simply: ``` The HMS table locking is a 2-step process: 1. Lock Creation: Create lock in HMS and queue for acquisition 2. Lock Check: Check if lock successfully acquired ``` This way, they are more relevant to the property names (lock-create, lock-check) Also wasnt sure about putting the details of 'should wait until every previously created Lock is released', is this the Hive internals? As I feel it will just confuse more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] marsupialtail opened a new issue, #6541: Does PyIceberg support DynamoDB catalog?
marsupialtail opened a new issue, #6541: URL: https://github.com/apache/iceberg/issues/6541 ### Query engine PyIceberg -- I just want to read the metadata ### Question Say I got a dynamodb table prodiceberg_metastore as my catalog. How I read it in PyIceberg? Can someone give some instructions please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6449: WIP: Delta: Adding support for Migrating Delta Lake Table to Iceberg Table
jackye1995 commented on code in PR #6449: URL: https://github.com/apache/iceberg/pull/6449#discussion_r1064046246 ## delta-lake/src/integration/java/org/apache/iceberg/delta/TestMigrateDeltaLakeTable.java: ## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.CatalogExtension; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.delta.catalog.DeltaCatalog; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestMigrateDeltaLakeTable extends SparkDeltaLakeMigrationTestBase { + private static final String NAMESPACE = "default"; + + private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + private String partitionedIdentifier; + private String unpartitionedIdentifier; + + @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}") + public static Object[][] parameters() { +return new Object[][] { + new Object[] { +"delta", +DeltaCatalog.class.getName(), +ImmutableMap.of( +"type", +"hive", +"default-namespace", +"default", +"parquet-enabled", +"true", +"cache-enabled", +"false" // Spark will delete tables using v1, leaving the cache out of sync +) + } +}; + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Rule public TemporaryFolder other = new TemporaryFolder(); + + private final String partitionedTableName = "partitioned_table"; + private final String unpartitionedTableName = "unpartitioned_table"; + + private final String defaultSparkCatalog = "spark_catalog"; + private String partitionedLocation; + private String unpartitionedLocation; + private final String type; + private TableCatalog catalog; + + private String catalogName; + + public TestMigrateDeltaLakeTable( + String catalogName, String implementation, Map config) { +super(catalogName, implementation, config); +spark +.conf() +.set("spark.sql.catalog." + defaultSparkCatalog, SparkSessionCatalog.class.getName()); +this.catalog = (TableCatalog) spark.sessionState().catalogManager().catalog(catalogName); +this.type = config.get("type"); +this.catalogName = catalogName; + } + + @Before + public void before() { +try { + File partitionedFolder = temp.newFolder(); + File unpartitionedFolder = other.newFolder(); + partitionedLocation = partitionedFolder.toURI().toString(); + unpartitionedLocation = unpartitionedFolder.toURI().toString(); +} catch (IOException e) { + throw new RuntimeException(e); +} + +partitionedIdentifier = destName(partitionedTableName); +unpartitionedIdentifier = destName(unpartitionedTableName); + +CatalogExtension delta = +(CatalogExtension) spark.sessionState().catalogManager().catalog("delta"); +// This needs to be set, otherwise Delta operations fail as the catalog is designed to override +// the default catalog (spark_catalog). + delta.setDelegateCatalog(spark.sessionState().catalogManager().currentCatalog()); + +spark.sql(String.format("DROP TABLE IF EXISTS %s", partitionedIdentifier)); +spark.sql(String.format("DROP TAB
[GitHub] [iceberg] danielcweeks merged pull request #6540: Python: Fix the mdformat issue
danielcweeks merged PR #6540: URL: https://github.com/apache/iceberg/pull/6540 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #5251: Discrepancy between partition truncation transform in spec vs code (org.apache.iceberg.transforms.Transform)
github-actions[bot] commented on issue #5251: URL: https://github.com/apache/iceberg/issues/5251#issuecomment-1374663986 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #5141: No way to rollback first commit in table
github-actions[bot] commented on issue #5141: URL: https://github.com/apache/iceberg/issues/5141#issuecomment-1374664011 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] closed issue #5141: No way to rollback first commit in table
github-actions[bot] closed issue #5141: No way to rollback first commit in table URL: https://github.com/apache/iceberg/issues/5141 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] closed issue #5139: Historical time travel imports
github-actions[bot] closed issue #5139: Historical time travel imports URL: https://github.com/apache/iceberg/issues/5139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #5139: Historical time travel imports
github-actions[bot] commented on issue #5139: URL: https://github.com/apache/iceberg/issues/5139#issuecomment-1374664024 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064063897 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java: ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +/** + * DataStatistics defines the interface to collect data statistics. + * + * {@link ShuffleOperator} will store local data statistics and later distribute the global + * statistics(received from ShuffleCoordiantor) to Partitioner. + */ +interface DataStatistics { Review Comment: let's annotate this as `@Internal` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064168 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +/** + * DataStatisticsFactory provides the DataStatistics definition for different mode like HASH, RANGE + */ +class DataStatisticsFactory { + + DataStatistics createDataStatistics() { Review Comment: I am not sure if there is much value of this factory class. even if we want to keep it, this method should be called `createMapStatistics`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064551 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** MapDataStatistics uses map to count key frequency */ +class MapDataStatistics implements DataStatistics { + private final Map dataStatistics = Maps.newHashMap(); + + @Override + public long size() { +return dataStatistics.size(); + } + + @Override + public void put(K key) { Review Comment: from the implementation, I see `put` is probably not the most intuitive method name. For map count, this operation is more like `increment(K key)`. Not sure if `increment` will be meaningful to digest/probabilistic data structures. Alternative would be `add(K key)` or `add(K key, int count). cc @pvary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064723 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** MapDataStatistics uses map to count key frequency */ +class MapDataStatistics implements DataStatistics { + private final Map dataStatistics = Maps.newHashMap(); + + @Override + public long size() { +return dataStatistics.size(); + } + + @Override + public void put(K key) { +dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L); + } + + @Override + public void merge(DataStatistics other) { +Preconditions.checkArgument( +other instanceof MapDataStatistics, "Can not merge this type of statistics: " + other); +MapDataStatistics mapDataStatistic = (MapDataStatistics) other; +mapDataStatistic.dataStatistics.forEach( +(key, count) -> dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + count)); + } + + @VisibleForTesting Review Comment: I feel this will not just for testing purpose. In the end, partitioner would need to retrieve the actual map statistics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064970 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java: ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +/** + * DataStatistics defines the interface to collect data statistics. + * + * {@link ShuffleOperator} will store local data statistics and later distribute the global Review Comment: I feel here we don't need to refer to ShuffleOperator. But we do need to expand a little more on what `data statistics` mean? E.g., ``` Data statistics tracks traffic volume distribution across some key or keys. For low-cardinality key, a simple map of (key, count) can be used. For high-cardinality key, probabilistic data structures (sketching) can be used. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064970 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java: ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +/** + * DataStatistics defines the interface to collect data statistics. + * + * {@link ShuffleOperator} will store local data statistics and later distribute the global Review Comment: I feel here we don't need to refer to ShuffleOperator. But we do need to expand a little more on what `data statistics` mean? E.g., Data statistics tracks traffic volume distribution across some key or keys. For low-cardinality key, a simple map of (key, count) can be used. For high-cardinality key, probabilistic data structures (sketching) can be used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064065085 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** MapDataStatistics uses map to count key frequency */ +class MapDataStatistics implements DataStatistics { + private final Map dataStatistics = Maps.newHashMap(); + + @Override + public long size() { +return dataStatistics.size(); + } + + @Override + public void put(K key) { +dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L); + } + + @Override + public void merge(DataStatistics other) { +Preconditions.checkArgument( +other instanceof MapDataStatistics, "Can not merge this type of statistics: " + other); +MapDataStatistics mapDataStatistic = (MapDataStatistics) other; Review Comment: nit: I would call this `otherStatistics` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on pull request #6534: Spark 3.3: Use regular planning for applicable row-level operations
aokolnychyi commented on PR #6534: URL: https://github.com/apache/iceberg/pull/6534#issuecomment-1374674560 I'll have to revisit the approach in this PR. It is actually not safe and may lead to data correctness bugs. I'll follow up with a fix next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6542: Build: Bump spotless-plugin-gradle from 6.12.0 to 6.12.1
dependabot[bot] opened a new pull request, #6542: URL: https://github.com/apache/iceberg/pull/6542 Bumps [spotless-plugin-gradle](https://github.com/diffplug/spotless) from 6.12.0 to 6.12.1. Commits https://github.com/diffplug/spotless/commit/718a504c123de899e75300e7f2f6c55d7a40da42";>718a504 Published gradle/6.12.1 https://github.com/diffplug/spotless/commit/c13acee2130be0b730a14fbadade6e7f597001b7";>c13acee Published lib/2.31.1 https://github.com/diffplug/spotless/commit/552aabf876469227d4bdc0be0da65b6d746fbc19";>552aabf fix(deps): update dependency com.facebook:ktfmt to v0.42 (https://github-redirect.dependabot.com/diffplug/spotless/issues/1421";>#1421) https://github.com/diffplug/spotless/commit/4063e9f6d134134e5e1fb706e70a7f4cd6e5b403";>4063e9f Add support for KtLint 0.48.0 (https://github-redirect.dependabot.com/diffplug/spotless/issues/1432";>#1432 fixes https://github-redirect.dependabot.com/diffplug/spotless/issues/1430";>#1430) https://github.com/diffplug/spotless/commit/062e83584650be1dd47f5c8485426e8438b05600";>062e835 Bump changelogs. https://github.com/diffplug/spotless/commit/8f7e00594de49856e2c14edf09d89352e0eddd60";>8f7e005 spotlessApply https://github.com/diffplug/spotless/commit/9a8ccae9ec04ff5e6f37da07cc445242ef69c516";>9a8ccae Bump default ktfmt 0.41 -> 0.42 https://github.com/diffplug/spotless/commit/fb4277d2b1ad172ce17fd4bb398756b527379752";>fb4277d Merge branch 'main-ktlint-0.48.0' into renovate/ver_ktfmt https://github.com/diffplug/spotless/commit/b44d70d00add006427f3cb8ef2387da543addfa3";>b44d70d Move changelog entries to the correct release. https://github.com/diffplug/spotless/commit/b3d8e89002c21324f03e896c0d786df3be09839d";>b3d8e89 spotlessApply for 2023 Additional commits viewable in https://github.com/diffplug/spotless/compare/gradle/6.12.0...gradle/6.12.1";>compare view [](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6543: Build: Bump moto from 4.0.12 to 4.0.13 in /python
dependabot[bot] opened a new pull request, #6543: URL: https://github.com/apache/iceberg/pull/6543 Bumps [moto](https://github.com/spulec/moto) from 4.0.12 to 4.0.13. Changelog Sourced from https://github.com/getmoto/moto/blob/master/CHANGELOG.md";>moto's changelog. 4.0.13 Docker Digest for 4.0.13: sha256:703a9d464c11e1f4cacff66acdc9b46f9fa8fb0b969ca9f1e79fa4eb41678565 New Methods: * EC2: * get_password_data() * Sagemaker: * update_pipeline() * SecretsManager: * cancel_rotate_secret() Miscellaneous: * CloudWatch: put_metric_data() now supports the StatisticValues-parameter * CognitoIDP: sign_out() now also invalidates the AccessToken * IAM: get_account_authorization_details() now returns the Tags-attribute * IOT: create_keys_and_certificate() now creates valid certificates, instead of random data Commits https://github.com/getmoto/moto/commit/2e08c321d9052fea0714008d843115a38bf762d0";>2e08c32 Prep release 4.0.13 (https://github-redirect.dependabot.com/spulec/moto/issues/5813";>#5813) https://github.com/getmoto/moto/commit/d68fb04ee1fdf33c485b00fb40500406ac32d857";>d68fb04 IOTData: Fix bug where publish() could only be called once (https://github-redirect.dependabot.com/spulec/moto/issues/5812";>#5812) https://github.com/getmoto/moto/commit/031f89dee00c6a5e01ac41c430d701508988f146";>031f89d Implement secretsmanager CancelRotateSecret (https://github-redirect.dependabot.com/spulec/moto/issues/5809";>#5809) https://github.com/getmoto/moto/commit/d89e4d236ceaa5eda2c915096c0ae5d989f4b1b2";>d89e4d2 docs: fix patching external client/resource docs (https://github-redirect.dependabot.com/spulec/moto/issues/5804";>#5804) https://github.com/getmoto/moto/commit/555928af5d56b33326d0b4309c175065e3f422ae";>555928a IAM: add tags for users to get-account-authorization-details response (https://github-redirect.dependabot.com/spulec/moto/issues/5803";>#5803) https://github.com/getmoto/moto/commit/446930fcc0f3f3d5146e1ea970bf501cc263a069";>446930f CloudFormation: changing error message to match aws api (stack update attempt... https://github.com/getmoto/moto/commit/a53f620846b36316b832ba32924324c6182a3e9f";>a53f620 IoT: Generate valid keys and certificates (https://github-redirect.dependabot.com/spulec/moto/issues/5801";>#5801) https://github.com/getmoto/moto/commit/f5fbddec86ecd78fe5c1945c1c9d2d6bf5de44b8";>f5fbdde chore(readme): Align code snippets formatting with project's rules. (https://github-redirect.dependabot.com/spulec/moto/issues/5802";>#5802) https://github.com/getmoto/moto/commit/5920d1a8ce7c17fc4190c1cd32670e89776107fe";>5920d1a Techdebt: MyPy DynamoDB v20111205 (https://github-redirect.dependabot.com/spulec/moto/issues/5799";>#5799) https://github.com/getmoto/moto/commit/fb0a4d64c87a2a4e6e7008701d79f7db55e060cc";>fb0a4d6 EC2: Implement GetPasswordData (https://github-redirect.dependabot.com/spulec/moto/issues/5798";>#5798) Additional commits viewable in https://github.com/spulec/moto/compare/4.0.12...4.0.13";>compare view [](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yours
[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6544: Build: Bump rich from 13.0.0 to 13.0.1 in /python
dependabot[bot] opened a new pull request, #6544: URL: https://github.com/apache/iceberg/pull/6544 Bumps [rich](https://github.com/Textualize/rich) from 13.0.0 to 13.0.1. Release notes Sourced from https://github.com/Textualize/rich/releases";>rich's releases. Fix for splitting segments Fix for an issue where Segment.split_cells produced the wrong result. Mostly a hotfix for the benefit of Textual. [13.0.1] - 2023-01-06 Fixed Fixed issue with Segment.split_cells for mixed single and double cell widths Changelog Sourced from https://github.com/Textualize/rich/blob/master/CHANGELOG.md";>rich's changelog. [13.0.1] - 2023-01-06 Fixed Fixed issue with Segment.split_cells for mixed single and double cell widths Commits https://github.com/Textualize/rich/commit/87529ad403630d6b7d1047179a5e62341fd75b3c";>87529ad version bump https://github.com/Textualize/rich/commit/4bc4437d1b929ee2a0f663be236940435eee31c9";>4bc4437 Merge pull request https://github-redirect.dependabot.com/Textualize/rich/issues/2736";>#2736 from Textualize/split-fix https://github.com/Textualize/rich/commit/aa0929298bf85b9357edf8af239d715a18ab5ce8";>aa09292 fix issue splitting segments See full diff in https://github.com/Textualize/rich/compare/v13.0.0...v13.0.1";>compare view [](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6545: Build: Bump coverage from 7.0.1 to 7.0.4 in /python
dependabot[bot] opened a new pull request, #6545: URL: https://github.com/apache/iceberg/pull/6545 Bumps [coverage](https://github.com/nedbat/coveragepy) from 7.0.1 to 7.0.4. Changelog Sourced from https://github.com/nedbat/coveragepy/blob/master/CHANGES.rst";>coverage's changelog. Version 7.0.4 — 2023-01-07 Performance: an internal cache of file names was accidentally disabled, resulting in sometimes drastic reductions in performance. This is now fixed, closing issue 1527_. Thanks to Ivan Ciuvalschii for the reproducible test case. .. _issue 1527: https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1527";>nedbat/coveragepy#1527 .. _changes_7-0-3: Version 7.0.3 — 2023-01-03 Fix: when using pytest-cov or pytest-xdist, or perhaps both, the combining step could fail with assert row is not None using 7.0.2. This was due to a race condition that has always been possible and is still possible. In 7.0.1 and before, the error was silently swallowed by the combining code. Now it will produce a message "Couldn't combine data file" and ignore the data file as it used to do before 7.0.2. Closes issue 1522_. .. _issue 1522: https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1522";>nedbat/coveragepy#1522 .. _changes_7-0-2: Version 7.0.2 — 2023-01-02 Fix: when using the [run] relative_files = True setting, a relative [paths] pattern was still being made absolute. This is now fixed, closing issue 1519_. Fix: if Python doesn't provide tomllib, then TOML configuration files can only be read if coverage.py is installed with the [toml] extra. Coverage.py will raise an error if TOML support is not installed when it sees your settings are in a .toml file. But it didn't understand that [tools.coverage] was a valid section header, so the error wasn't reported if you used that header, and settings were silently ignored. This is now fixed, closing issue 1516_. Fix: adjusted how decorators are traced on PyPy 7.3.10, fixing issue 1515_. Fix: the coverage lcov report did not properly implement the --fail-under=MIN option. This has been fixed. Refactor: added many type annotations, including a number of refactorings. This should not affect outward behavior, but they were a bit invasive in some ... (truncated) Commits https://github.com/nedbat/coveragepy/commit/f4c27c7888b17e73c0d52ff7bfb8da505f532819";>f4c27c7 docs: sample html report for 7.0.4 https://github.com/nedbat/coveragepy/commit/2b95dba9bfdf2223b42d07536591c38833a37137";>2b95dba docs: prep for 7.0.4 https://github.com/nedbat/coveragepy/commit/61ccfb8d314f0e11a3a4ee927f2024ae9cc309e0";>61ccfb8 test(benchmark): more reasonable numeric displays https://github.com/nedbat/coveragepy/commit/2fa45d693de8861fcf84ee9cec85d6ab46203ac6";>2fa45d6 refactor(benchmark): move benchmark.py to its own directory https://github.com/nedbat/coveragepy/commit/4d6ac8bb1714f9f7a655f64ed31e33e134fc961f";>4d6ac8b test(perf): randomize the order of benchmark runs https://github.com/nedbat/coveragepy/commit/405fae882f51faa356aaf8b5b3d838ee67db432d";>405fae8 build: add .git-blame-ignore-revs https://github.com/nedbat/coveragepy/commit/9554e50a79dd05385178aa9f781c6ecb5b6ad45d";>9554e50 style(perf): blacken lab/benchmark.py https://github.com/nedbat/coveragepy/commit/dc4f0c1f82731250d0eb2a463b7867457e97c675";>dc4f0c1 test(perf): more experiments for https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1527";>#1527 https://github.com/nedbat/coveragepy/commit/b3b05bdfdd20ab14004396cfcceeabe23e61cb41";>b3b05bd perf: the file mapping cache was off by mistake. https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1527";>#1527 https://github.com/nedbat/coveragepy/commit/d00f1dd24960db7a4bbcb9b1cfd10646dfb2c850";>d00f1dd mypy: debug.py Additional commits viewable in https://github.com/nedbat/coveragepy/compare/7.0.1...7.0.4";>compare view [](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@depe
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064551 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** MapDataStatistics uses map to count key frequency */ +class MapDataStatistics implements DataStatistics { + private final Map dataStatistics = Maps.newHashMap(); + + @Override + public long size() { +return dataStatistics.size(); + } + + @Override + public void put(K key) { Review Comment: from the implementation, I see `put` is probably not the most intuitive method name. For map count, this operation is more like `increment(K key)`. Not sure if `increment` will be meaningful to digest/probabilistic data structures. Alternative would be `add(K key)` or `add(K key, int count)`. cc @pvary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080560 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. Review Comment: nit: we can make the description a little more accurate. e.g. ``` Shuffle operator collects traffic distribution statistics. A custom partitioner shall be attached to the shuffle operator output. The custom partitioner leverages the statistics to shuffle record to improve data clustering while maintaining relative balanced traffic distribution to downstream subtasks. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080676 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics localDataStatistics; + private transient DataStatistics globalDataStatistics; + private transient ListState> globalDataStatisticsState; + private transient DataStatisticsFactory dataStatisticsFactory; + + public ShuffleOperator( + KeySelector keySelector, + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory dataStatisticsFactory) { +this.keySelector = keySelector; +this.operatorEventGateway = operatorEventGateway; +this.dataStatisticsFactory = dataStatisticsFactory; + } + + @VisibleForTesting + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { Review Comment: not sure if this method is necessary for code reuse. not sure why it is necessary to check it in testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080711 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record Review Comment: nit: these comments seem not relevant anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080833 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics localDataStatistics; Review Comment: nit: simplify the names a little by removing `Data` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064081012 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics localDataStatistics; + private transient DataStatistics globalDataStatistics; + private transient ListState> globalDataStatisticsState; + private transient DataStatisticsFactory dataStatisticsFactory; + + public ShuffleOperator( + KeySelector keySelector, + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory dataStatisticsFactory) { +this.keySelector = keySelector; +this.operatorEventGateway = operatorEventGateway; +this.dataStatisticsFactory = dataStatisticsFactory; + } + + @VisibleForTesting + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { +return new ListStateDescriptor<>( +"globalDataStatisticsState", TypeInformation.of(new TypeHint>() {})); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { +localDataStatistics = dataStatisticsFactory.createDataStatistics(); +globalDataStatisticsState = Review Comment: we need to check if context is restored ``` if (context.isRestored()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r106408 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics localDataStatistics; + private transient DataStatistics globalDataStatistics; + private transient ListState> globalDataStatisticsState; + private transient DataStatisticsFactory dataStatisticsFactory; + + public ShuffleOperator( + KeySelector keySelector, + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory dataStatisticsFactory) { +this.keySelector = keySelector; +this.operatorEventGateway = operatorEventGateway; +this.dataStatisticsFactory = dataStatisticsFactory; + } + + @VisibleForTesting + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { +return new ListStateDescriptor<>( +"globalDataStatisticsState", TypeInformation.of(new TypeHint>() {})); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { +localDataStatistics = dataStatisticsFactory.createDataStatistics(); +globalDataStatisticsState = +context +.getOperatorStateStore() +.getListState(generateGlobalDataDistributionWeightDescriptor()); + +if (globalDataStatisticsState.get() != null +&& globalDataStatisticsState.get().iterator().hasNext()) { + globalDataStatistics = globalDataStatisticsState.get().iterator().next(); +} else { + globalDataStatistics = dataStatisticsFactory.createDataStatistics(); Review Comment: this is a cold start problem (no state, rescale, etc.). does it make sense to create an empty Statistics object or we should keep it null to indicate cold start problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083596 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics localDataStatistics; + private transient DataStatistics globalDataStatistics; + private transient ListState> globalDataStatisticsState; + private transient DataStatisticsFactory dataStatisticsFactory; + + public ShuffleOperator( + KeySelector keySelector, + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory dataStatisticsFactory) { +this.keySelector = keySelector; +this.operatorEventGateway = operatorEventGateway; +this.dataStatisticsFactory = dataStatisticsFactory; + } + + @VisibleForTesting + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { +return new ListStateDescriptor<>( +"globalDataStatisticsState", TypeInformation.of(new TypeHint>() {})); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { +localDataStatistics = dataStatisticsFactory.createDataStatistics(); +globalDataStatisticsState = +context +.getOperatorStateStore() +.getListState(generateGlobalDataDistributionWeightDescriptor()); + +if (globalDataStatisticsState.get() != null +&& globalDataStatisticsState.get().iterator().hasNext()) { + globalDataStatistics = globalDataStatisticsState.get().iterator().next(); +} else { + globalDataStatistics = dataStatisticsFactory.createDataStatistics(); +} + } + + @Override + public void open() throws Exception { +// TODO: handle scaling up +if (globalDataStatistics != null && globalDataStatistics.size() > 0) { + output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromStatistics(globalDataStatistics))); +} + } + + @Override + public void handleOperatorEvent(OperatorEvent evt) { +// TODO: receive event with globalDataDistributionWeight from coordinator and update +// globalDataStatistics + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { +final K key = keySelector.getKey(streamRecord.getValue()); +localDataStatistics.put(key); +output.collect(new StreamRecord<>(Shuf
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083656 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * The wrapper class for record and data distribution weight Review Comment: we should explain why this wrapper class it needed. This is the only way for the shuffle operator to pass the statistics to the custom partitioner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083753 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * The wrapper class for record and data distribution weight + * + * ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may contain a record or Review Comment: > It may contain a record or data distribution weight. we can be more precise here. `It contains either a record or the data statistics (globally aggregated).` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083871 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * The wrapper class for record and data distribution weight + * + * ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may contain a record or + * data distribution weight. Once partitioner receives the weight, it will use that to decide the + * coming record should send to which writer subtask. After shuffling, a filter and mapper are + * required to filter out the data distribution weight, unwrap the object and extract the original + * record type T. + */ +public class ShuffleRecordWrapper implements Serializable { + + private static final long serialVersionUID = 1L; + + private final DataStatistics globalDataStatistics; + private final T record; + + private ShuffleRecordWrapper(T record, DataStatistics globalDataStatistics) { +Preconditions.checkArgument( +record != null ^ globalDataStatistics != null, +"A ShuffleRecordWrapper contain either record and stats, not neither or both"); +this.globalDataStatistics = globalDataStatistics; +this.record = record; + } + + static ShuffleRecordWrapper fromRecord(T record) { +return new ShuffleRecordWrapper<>(record, null); + } + + static ShuffleRecordWrapper fromStatistics(DataStatistics globalDataStatistics) { +return new ShuffleRecordWrapper<>(null, globalDataStatistics); + } + + boolean hasGlobalDataDistributionWeight() { Review Comment: let's be consistent here. by the variable name, the method name could be `hasDataStatistics` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083893 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * The wrapper class for record and data distribution weight + * + * ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may contain a record or + * data distribution weight. Once partitioner receives the weight, it will use that to decide the + * coming record should send to which writer subtask. After shuffling, a filter and mapper are + * required to filter out the data distribution weight, unwrap the object and extract the original + * record type T. + */ +public class ShuffleRecordWrapper implements Serializable { + + private static final long serialVersionUID = 1L; + + private final DataStatistics globalDataStatistics; + private final T record; + + private ShuffleRecordWrapper(T record, DataStatistics globalDataStatistics) { +Preconditions.checkArgument( +record != null ^ globalDataStatistics != null, +"A ShuffleRecordWrapper contain either record and stats, not neither or both"); +this.globalDataStatistics = globalDataStatistics; +this.record = record; + } + + static ShuffleRecordWrapper fromRecord(T record) { +return new ShuffleRecordWrapper<>(record, null); + } + + static ShuffleRecordWrapper fromStatistics(DataStatistics globalDataStatistics) { +return new ShuffleRecordWrapper<>(null, globalDataStatistics); + } + + boolean hasGlobalDataDistributionWeight() { +return globalDataStatistics != null; + } + + boolean hasRecord() { +return record != null; + } + + DataStatistics globalDataDistributionWeight() { Review Comment: nit: maybe just `dataStatistics`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064084354 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestShuffleOperator { + private ShuffleOperator operator; + + private Environment getTestingEnvironment() { +return new StreamMockEnvironment( +new Configuration(), +new Configuration(), +new ExecutionConfig(), +1L, +new MockInputSplitProvider(), +1, +new TestTaskStateManager()); + } + + @Before + public void before() throws Exception { +MockOperatorEventGateway mockGateway = new MockOperatorEventGateway(); +KeySelector keySelector = +new KeySelector() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public String getKey(String value) { +return value; + } +}; + +this.operator = +new ShuffleOperator<>(keySelector, TypeInformation.of(String.class), mockGateway); +Environment env = getTestingEnvironment(); +this.operator.setup( +new OneInputStreamTask(env), +new MockStreamConfig(new Configuration(), 1), +new MockOutput<>(Lists.newArrayList())); + } + + @After + public void clean() throws Exception { +operator.close(); + } + + @Test + public void testInitializeState() throws Exception { Review Comment: not sure if this test is necessary. seems too trivial to test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
stevenzwu commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064084432 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java: ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestShuffleOperator { + private ShuffleOperator operator; + + private Environment getTestingEnvironment() { +return new StreamMockEnvironment( +new Configuration(), +new Configuration(), +new ExecutionConfig(), +1L, +new MockInputSplitProvider(), +1, +new TestTaskStateManager()); + } + + @Before + public void before() throws Exception { +MockOperatorEventGateway mockGateway = new MockOperatorEventGateway(); +KeySelector keySelector = +new KeySelector() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public String getKey(String value) { +return value; + } +}; +DataStatisticsFactory dataStatisticsFactory = new DataStatisticsFactory<>(); + +this.operator = new ShuffleOperator<>(keySelector, mockGateway, dataStatisticsFactory); +Environment env = getTestingEnvironment(); +this.operator.setup( +new OneInputStreamTask(env), +new MockStreamConfig(new Configuration(), 1), +new MockOutput<>(Lists.newArrayList())); + } + + @After + public void clean() throws Exception { +operator.close(); + } + + @Test + public void testInitializeState() throws Exception { +StateInitializationContext stateContext = getStateContext(); +operator.initializeState(stateContext); + +assertNotNull( +stateContext +.getOperatorStateStore() + .getListState(operator.generateGlobalDataDistributionWeightDescriptor())); + } + + @Test + public void testProcessElement() throws Exception { +StateInitializationContext stateContext = getStateContext(); +operator.initializeState(stateContext); +operator.processElement(new StreamRecord<>("a")); +operator.processElement(new StreamRecord<>("a")); +operator.processElement(new StreamRecord<>("b")); +assertTrue(operator.localDataStatistics() instanceo
[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6449: WIP: Delta: Adding support for Snapshot Delta Lake Table to Iceberg Table
jackye1995 commented on code in PR #6449: URL: https://github.com/apache/iceberg/pull/6449#discussion_r1064097728 ## build.gradle: ## @@ -438,6 +442,70 @@ project(':iceberg-aws') { } } +project(':iceberg-delta-lake') { + configurations { +integrationImplementation.extendsFrom testImplementation Review Comment: Probably should add some comments about why integration test is used here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org