[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #6335: Core: Avoid generating a large ManifestFile when committing

2023-01-07 Thread GitBox


ConeyLiu commented on code in PR #6335:
URL: https://github.com/apache/iceberg/pull/6335#discussion_r1063976959


##
core/src/main/java/org/apache/iceberg/FastAppend.java:
##
@@ -49,8 +51,9 @@ class FastAppend extends SnapshotProducer 
implements AppendFiles {
   private final List newFiles = Lists.newArrayList();
   private final List appendManifests = Lists.newArrayList();
   private final List rewrittenAppendManifests = 
Lists.newArrayList();
-  private ManifestFile newManifest = null;
+  private List cachedNewManifests = null;

Review Comment:
   @RussellSpitzer Do you mean the `caching` part here? Here just rename the 
variable because it is plural now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening

2023-01-07 Thread GitBox


szehon-ho commented on code in PR #6451:
URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982


##
docs/configuration.md:
##
@@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They 
are used by some catalo
 
 The following properties from the Hadoop configuration are used by the Hive 
Metastore connector.
 
-| Property  | Default  | Description   
 |
-| - |  | 
--
 |
-| iceberg.hive.client-pool-size | 5| The size of the 
Hive client pool when tracking tables in HMS   |
-| iceberg.hive.lock-timeout-ms  | 18 (3 min)   | Maximum time in 
milliseconds to acquire a lock |
-| iceberg.hive.lock-check-min-wait-ms   | 50   | Minimum time in 
milliseconds to check back on the status of lock acquisition   |
-| iceberg.hive.lock-check-max-wait-ms   | 5000 | Maximum time in 
milliseconds to check back on the status of lock acquisition   |
-
-Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the 
[transaction 
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
 
+| Property  | Default | Description
  |
+|---|-|--|
+| iceberg.hive.client-pool-size | 5   | The size of 
the Hive client pool when tracking tables in HMS |
+| iceberg.hive.lock-timeout-ms  | 18 (3 min)  | Maximum time 
in milliseconds to acquire a lock   |

Review Comment:
   I see, just saw its from unrelated changelist.  But it still doesnt fit very 
well with the two phases (creation/acquisition) you put in the first part of 
the doc, should we just say simply:
   
   
   The HMS table locking is a 2-step process:
   1. Lock create: Create lock in HMS and queue for acquisition
   2. Lock check: Check if lock successfully acquired
   
   This way, they are more relevant to the property name.  Also wasnt sure 
about putting the details of 'should wait until every previously created Lock 
is released', is this the Hive internals?  As I feel it will just confuse more. 



##
docs/configuration.md:
##
@@ -159,15 +159,24 @@ Here are the catalog properties related to locking. They 
are used by some catalo
 ## Hadoop configuration
 
 The following properties from the Hadoop configuration are used by the Hive 
Metastore connector.
-
-| Property  | Default  | Description   
 |
-| - |  | 
--
 |
-| iceberg.hive.client-pool-size | 5| The size of the 
Hive client pool when tracking tables in HMS   |
-| iceberg.hive.lock-timeout-ms  | 18 (3 min)   | Maximum time in 
milliseconds to acquire a lock |
-| iceberg.hive.lock-check-min-wait-ms   | 50   | Minimum time in 
milliseconds to check back on the status of lock acquisition   |
-| iceberg.hive.lock-check-max-wait-ms   | 5000 | Maximum time in 
milliseconds to check back on the status of lock acquisition   |
-
-Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the 
[transaction 
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
 
+The HMS table locking is a 2-step process:
+- Lock creation - the Lock itself is created and queued
+- Lock acquisition - the queued Lock should wait until every previously 
created Lock is released
+
+| Property  | Default | Description
  |
+|---|-|--|
+| iceberg.hive.client-pool-size | 5   | The size of 
the Hive client pool when tracking tables in HMS |
+| iceberg.hive.lock-creation-timeout-ms | 18 (3 min)  | Maximum time 
in milliseconds to create a lock in the HMS |
+| iceberg.hive.lock-creation-min-wait-ms| 50  | Minimum time 
in milliseconds to check the lock creation in the HMS   |

Review Comment:

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening

2023-01-07 Thread GitBox


szehon-ho commented on code in PR #6451:
URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982


##
docs/configuration.md:
##
@@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They 
are used by some catalo
 
 The following properties from the Hadoop configuration are used by the Hive 
Metastore connector.
 
-| Property  | Default  | Description   
 |
-| - |  | 
--
 |
-| iceberg.hive.client-pool-size | 5| The size of the 
Hive client pool when tracking tables in HMS   |
-| iceberg.hive.lock-timeout-ms  | 18 (3 min)   | Maximum time in 
milliseconds to acquire a lock |
-| iceberg.hive.lock-check-min-wait-ms   | 50   | Minimum time in 
milliseconds to check back on the status of lock acquisition   |
-| iceberg.hive.lock-check-max-wait-ms   | 5000 | Maximum time in 
milliseconds to check back on the status of lock acquisition   |
-
-Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the 
[transaction 
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
 
+| Property  | Default | Description
  |
+|---|-|--|
+| iceberg.hive.client-pool-size | 5   | The size of 
the Hive client pool when tracking tables in HMS |
+| iceberg.hive.lock-timeout-ms  | 18 (3 min)  | Maximum time 
in milliseconds to acquire a lock   |

Review Comment:
   I see, just saw its from another pr and unrelated to this one.  But it still 
doesnt fit very well with the two phases (creation/acquisition) you put in the 
first part of the doc, should we just say simply:
   
   
   The HMS table locking is a 2-step process:
   1. Lock create: Create lock in HMS and queue for acquisition
   2. Lock check: Check if lock successfully acquired
   
   This way, they are more relevant to the property name.  Also wasnt sure 
about putting the details of 'should wait until every previously created Lock 
is released', is this the Hive internals?  As I feel it will just confuse more. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening

2023-01-07 Thread GitBox


szehon-ho commented on code in PR #6451:
URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982


##
docs/configuration.md:
##
@@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They 
are used by some catalo
 
 The following properties from the Hadoop configuration are used by the Hive 
Metastore connector.
 
-| Property  | Default  | Description   
 |
-| - |  | 
--
 |
-| iceberg.hive.client-pool-size | 5| The size of the 
Hive client pool when tracking tables in HMS   |
-| iceberg.hive.lock-timeout-ms  | 18 (3 min)   | Maximum time in 
milliseconds to acquire a lock |
-| iceberg.hive.lock-check-min-wait-ms   | 50   | Minimum time in 
milliseconds to check back on the status of lock acquisition   |
-| iceberg.hive.lock-check-max-wait-ms   | 5000 | Maximum time in 
milliseconds to check back on the status of lock acquisition   |
-
-Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the 
[transaction 
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
 
+| Property  | Default | Description
  |
+|---|-|--|
+| iceberg.hive.client-pool-size | 5   | The size of 
the Hive client pool when tracking tables in HMS |
+| iceberg.hive.lock-timeout-ms  | 18 (3 min)  | Maximum time 
in milliseconds to acquire a lock   |

Review Comment:
   I see, just saw its from another pr and unrelated to this one.  But it still 
doesnt fit very well with the two phases (creation/acquisition) you put in the 
first part of the doc, should we just say simply:
   
   
   The HMS table locking is a 2-step process:
   1. Lock create: Create lock in HMS and queue for acquisition
   2. Lock check: Check if lock successfully acquired
   
   This way, they are more relevant to the property names (lock-create, 
lock-check)  Also wasnt sure about putting the details of 'should wait until 
every previously created Lock is released', is this the Hive internals?  As I 
feel it will just confuse more. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6451: Hive: Lock hardening

2023-01-07 Thread GitBox


szehon-ho commented on code in PR #6451:
URL: https://github.com/apache/iceberg/pull/6451#discussion_r1063985982


##
docs/configuration.md:
##
@@ -160,14 +160,20 @@ Here are the catalog properties related to locking. They 
are used by some catalo
 
 The following properties from the Hadoop configuration are used by the Hive 
Metastore connector.
 
-| Property  | Default  | Description   
 |
-| - |  | 
--
 |
-| iceberg.hive.client-pool-size | 5| The size of the 
Hive client pool when tracking tables in HMS   |
-| iceberg.hive.lock-timeout-ms  | 18 (3 min)   | Maximum time in 
milliseconds to acquire a lock |
-| iceberg.hive.lock-check-min-wait-ms   | 50   | Minimum time in 
milliseconds to check back on the status of lock acquisition   |
-| iceberg.hive.lock-check-max-wait-ms   | 5000 | Maximum time in 
milliseconds to check back on the status of lock acquisition   |
-
-Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the 
[transaction 
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
 
+| Property  | Default | Description
  |
+|---|-|--|
+| iceberg.hive.client-pool-size | 5   | The size of 
the Hive client pool when tracking tables in HMS |
+| iceberg.hive.lock-timeout-ms  | 18 (3 min)  | Maximum time 
in milliseconds to acquire a lock   |

Review Comment:
   I see, just saw its from another pr and unrelated to this one.  But it still 
doesnt fit very well with the two phases (creation/acquisition) you put in the 
first part of the doc, should we just say simply:
   
   ```
   The HMS table locking is a 2-step process:
   1. Lock Creation: Create lock in HMS and queue for acquisition
   2. Lock Check: Check if lock successfully acquired
   ```
   
   This way, they are more relevant to the property names (lock-create, 
lock-check)  Also wasnt sure about putting the details of 'should wait until 
every previously created Lock is released', is this the Hive internals?  As I 
feel it will just confuse more. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] marsupialtail opened a new issue, #6541: Does PyIceberg support DynamoDB catalog?

2023-01-07 Thread GitBox


marsupialtail opened a new issue, #6541:
URL: https://github.com/apache/iceberg/issues/6541

   ### Query engine
   
   PyIceberg -- I just want to read the metadata
   
   ### Question
   
   Say I got a dynamodb table prodiceberg_metastore as my catalog.
   
   How I read it in PyIceberg? Can someone give some instructions please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6449: WIP: Delta: Adding support for Migrating Delta Lake Table to Iceberg Table

2023-01-07 Thread GitBox


jackye1995 commented on code in PR #6449:
URL: https://github.com/apache/iceberg/pull/6449#discussion_r1064046246


##
delta-lake/src/integration/java/org/apache/iceberg/delta/TestMigrateDeltaLakeTable.java:
##
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.delta;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.CatalogExtension;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.delta.catalog.DeltaCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestMigrateDeltaLakeTable extends SparkDeltaLakeMigrationTestBase 
{
+  private static final String NAMESPACE = "default";
+
+  private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  private String partitionedIdentifier;
+  private String unpartitionedIdentifier;
+
+  @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
+  public static Object[][] parameters() {
+return new Object[][] {
+  new Object[] {
+"delta",
+DeltaCatalog.class.getName(),
+ImmutableMap.of(
+"type",
+"hive",
+"default-namespace",
+"default",
+"parquet-enabled",
+"true",
+"cache-enabled",
+"false" // Spark will delete tables using v1, leaving the cache 
out of sync
+)
+  }
+};
+  }
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @Rule public TemporaryFolder other = new TemporaryFolder();
+
+  private final String partitionedTableName = "partitioned_table";
+  private final String unpartitionedTableName = "unpartitioned_table";
+
+  private final String defaultSparkCatalog = "spark_catalog";
+  private String partitionedLocation;
+  private String unpartitionedLocation;
+  private final String type;
+  private TableCatalog catalog;
+
+  private String catalogName;
+
+  public TestMigrateDeltaLakeTable(
+  String catalogName, String implementation, Map config) {
+super(catalogName, implementation, config);
+spark
+.conf()
+.set("spark.sql.catalog." + defaultSparkCatalog, 
SparkSessionCatalog.class.getName());
+this.catalog = (TableCatalog) 
spark.sessionState().catalogManager().catalog(catalogName);
+this.type = config.get("type");
+this.catalogName = catalogName;
+  }
+
+  @Before
+  public void before() {
+try {
+  File partitionedFolder = temp.newFolder();
+  File unpartitionedFolder = other.newFolder();
+  partitionedLocation = partitionedFolder.toURI().toString();
+  unpartitionedLocation = unpartitionedFolder.toURI().toString();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+partitionedIdentifier = destName(partitionedTableName);
+unpartitionedIdentifier = destName(unpartitionedTableName);
+
+CatalogExtension delta =
+(CatalogExtension) 
spark.sessionState().catalogManager().catalog("delta");
+// This needs to be set, otherwise Delta operations fail as the catalog is 
designed to override
+// the default catalog (spark_catalog).
+
delta.setDelegateCatalog(spark.sessionState().catalogManager().currentCatalog());
+
+spark.sql(String.format("DROP TABLE IF EXISTS %s", partitionedIdentifier));
+spark.sql(String.format("DROP TAB

[GitHub] [iceberg] danielcweeks merged pull request #6540: Python: Fix the mdformat issue

2023-01-07 Thread GitBox


danielcweeks merged PR #6540:
URL: https://github.com/apache/iceberg/pull/6540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #5251: Discrepancy between partition truncation transform in spec vs code (org.apache.iceberg.transforms.Transform)

2023-01-07 Thread GitBox


github-actions[bot] commented on issue #5251:
URL: https://github.com/apache/iceberg/issues/5251#issuecomment-1374663986

   This issue has been automatically marked as stale because it has been open 
for 180 days with no activity. It will be closed in next 14 days if no further 
activity occurs. To permanently prevent this issue from being considered stale, 
add the label 'not-stale', but commenting on the issue is preferred when 
possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #5141: No way to rollback first commit in table

2023-01-07 Thread GitBox


github-actions[bot] commented on issue #5141:
URL: https://github.com/apache/iceberg/issues/5141#issuecomment-1374664011

   This issue has been closed because it has not received any activity in the 
last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] closed issue #5141: No way to rollback first commit in table

2023-01-07 Thread GitBox


github-actions[bot] closed issue #5141: No way to rollback first commit in table
URL: https://github.com/apache/iceberg/issues/5141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] closed issue #5139: Historical time travel imports

2023-01-07 Thread GitBox


github-actions[bot] closed issue #5139: Historical time travel imports
URL: https://github.com/apache/iceberg/issues/5139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #5139: Historical time travel imports

2023-01-07 Thread GitBox


github-actions[bot] commented on issue #5139:
URL: https://github.com/apache/iceberg/issues/5139#issuecomment-1374664024

   This issue has been closed because it has not received any activity in the 
last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064063897


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+/**
+ * DataStatistics defines the interface to collect data statistics.
+ *
+ * {@link ShuffleOperator} will store local data statistics and later 
distribute the global
+ * statistics(received from ShuffleCoordiantor) to Partitioner.
+ */
+interface DataStatistics {

Review Comment:
   let's annotate this as `@Internal`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064168


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+/**
+ * DataStatisticsFactory provides the DataStatistics definition for different 
mode like HASH, RANGE
+ */
+class DataStatisticsFactory {
+
+  DataStatistics createDataStatistics() {

Review Comment:
   I am not sure if there is much value of this factory class. even if we want 
to keep it, this method should be called `createMapStatistics`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064551


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/** MapDataStatistics uses map to count key frequency */
+class MapDataStatistics implements DataStatistics {
+  private final Map dataStatistics = Maps.newHashMap();
+
+  @Override
+  public long size() {
+return dataStatistics.size();
+  }
+
+  @Override
+  public void put(K key) {

Review Comment:
   from the implementation, I see `put` is probably not the most intuitive 
method name. For map count, this operation is more like `increment(K key)`. Not 
sure if `increment` will be meaningful to digest/probabilistic data structures. 
Alternative would be `add(K key)` or `add(K key, int count). 
   
   cc @pvary 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064723


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/** MapDataStatistics uses map to count key frequency */
+class MapDataStatistics implements DataStatistics {
+  private final Map dataStatistics = Maps.newHashMap();
+
+  @Override
+  public long size() {
+return dataStatistics.size();
+  }
+
+  @Override
+  public void put(K key) {
+dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L);
+  }
+
+  @Override
+  public void merge(DataStatistics other) {
+Preconditions.checkArgument(
+other instanceof MapDataStatistics, "Can not merge this type of 
statistics: " + other);
+MapDataStatistics mapDataStatistic = (MapDataStatistics) other;
+mapDataStatistic.dataStatistics.forEach(
+(key, count) -> dataStatistics.put(key, 
dataStatistics.getOrDefault(key, 0L) + count));
+  }
+
+  @VisibleForTesting

Review Comment:
   I feel this will not just for testing purpose. In the end, partitioner would 
need to retrieve the actual map statistics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064970


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+/**
+ * DataStatistics defines the interface to collect data statistics.
+ *
+ * {@link ShuffleOperator} will store local data statistics and later 
distribute the global

Review Comment:
   I feel here we don't need to refer to ShuffleOperator. But we do need to 
expand a little more on what `data statistics` mean? E.g.,
   
   ```
   Data statistics tracks traffic volume distribution across some key or keys. 
For low-cardinality key, a simple map of (key, count) can be used. For 
high-cardinality key, probabilistic data structures (sketching) can be used.
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064970


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+/**
+ * DataStatistics defines the interface to collect data statistics.
+ *
+ * {@link ShuffleOperator} will store local data statistics and later 
distribute the global

Review Comment:
   I feel here we don't need to refer to ShuffleOperator. But we do need to 
expand a little more on what `data statistics` mean? E.g.,
   
   
   Data statistics tracks traffic volume distribution across some key or keys. 
For low-cardinality key, a simple map of (key, count) can be used. For 
high-cardinality key, probabilistic data structures (sketching) can be used.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064065085


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/** MapDataStatistics uses map to count key frequency */
+class MapDataStatistics implements DataStatistics {
+  private final Map dataStatistics = Maps.newHashMap();
+
+  @Override
+  public long size() {
+return dataStatistics.size();
+  }
+
+  @Override
+  public void put(K key) {
+dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L);
+  }
+
+  @Override
+  public void merge(DataStatistics other) {
+Preconditions.checkArgument(
+other instanceof MapDataStatistics, "Can not merge this type of 
statistics: " + other);
+MapDataStatistics mapDataStatistic = (MapDataStatistics) other;

Review Comment:
   nit: I would call this `otherStatistics`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] aokolnychyi commented on pull request #6534: Spark 3.3: Use regular planning for applicable row-level operations

2023-01-07 Thread GitBox


aokolnychyi commented on PR #6534:
URL: https://github.com/apache/iceberg/pull/6534#issuecomment-1374674560

   I'll have to revisit the approach in this PR. It is actually not safe and 
may lead to data correctness bugs.
   I'll follow up with a fix next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6542: Build: Bump spotless-plugin-gradle from 6.12.0 to 6.12.1

2023-01-07 Thread GitBox


dependabot[bot] opened a new pull request, #6542:
URL: https://github.com/apache/iceberg/pull/6542

   Bumps [spotless-plugin-gradle](https://github.com/diffplug/spotless) from 
6.12.0 to 6.12.1.
   
   Commits
   
   https://github.com/diffplug/spotless/commit/718a504c123de899e75300e7f2f6c55d7a40da42";>718a504
 Published gradle/6.12.1
   https://github.com/diffplug/spotless/commit/c13acee2130be0b730a14fbadade6e7f597001b7";>c13acee
 Published lib/2.31.1
   https://github.com/diffplug/spotless/commit/552aabf876469227d4bdc0be0da65b6d746fbc19";>552aabf
 fix(deps): update dependency com.facebook:ktfmt to v0.42 (https://github-redirect.dependabot.com/diffplug/spotless/issues/1421";>#1421)
   https://github.com/diffplug/spotless/commit/4063e9f6d134134e5e1fb706e70a7f4cd6e5b403";>4063e9f
 Add support for KtLint 0.48.0 (https://github-redirect.dependabot.com/diffplug/spotless/issues/1432";>#1432
 fixes https://github-redirect.dependabot.com/diffplug/spotless/issues/1430";>#1430)
   https://github.com/diffplug/spotless/commit/062e83584650be1dd47f5c8485426e8438b05600";>062e835
 Bump changelogs.
   https://github.com/diffplug/spotless/commit/8f7e00594de49856e2c14edf09d89352e0eddd60";>8f7e005
 spotlessApply
   https://github.com/diffplug/spotless/commit/9a8ccae9ec04ff5e6f37da07cc445242ef69c516";>9a8ccae
 Bump default ktfmt 0.41 -> 0.42
   https://github.com/diffplug/spotless/commit/fb4277d2b1ad172ce17fd4bb398756b527379752";>fb4277d
 Merge branch 'main-ktlint-0.48.0' into renovate/ver_ktfmt
   https://github.com/diffplug/spotless/commit/b44d70d00add006427f3cb8ef2387da543addfa3";>b44d70d
 Move changelog entries to the correct release.
   https://github.com/diffplug/spotless/commit/b3d8e89002c21324f03e896c0d786df3be09839d";>b3d8e89
 spotlessApply for 2023
   Additional commits viewable in https://github.com/diffplug/spotless/compare/gradle/6.12.0...gradle/6.12.1";>compare
 view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.diffplug.spotless:spotless-plugin-gradle&package-manager=gradle&previous-version=6.12.0&new-version=6.12.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6543: Build: Bump moto from 4.0.12 to 4.0.13 in /python

2023-01-07 Thread GitBox


dependabot[bot] opened a new pull request, #6543:
URL: https://github.com/apache/iceberg/pull/6543

   Bumps [moto](https://github.com/spulec/moto) from 4.0.12 to 4.0.13.
   
   Changelog
   Sourced from https://github.com/getmoto/moto/blob/master/CHANGELOG.md";>moto's 
changelog.
   
   4.0.13
   Docker Digest for 4.0.13: 
sha256:703a9d464c11e1f4cacff66acdc9b46f9fa8fb0b969ca9f1e79fa4eb41678565
   New Methods:
   * EC2:
   * get_password_data()
   * Sagemaker:
   * update_pipeline()
   * SecretsManager:
   * cancel_rotate_secret()
   Miscellaneous:
   * CloudWatch: put_metric_data() now supports the StatisticValues-parameter
   * CognitoIDP: sign_out() now also invalidates the AccessToken
   * IAM: get_account_authorization_details() now returns the Tags-attribute
   * IOT: create_keys_and_certificate() now creates valid certificates, instead 
of random data
   
   
   
   
   Commits
   
   https://github.com/getmoto/moto/commit/2e08c321d9052fea0714008d843115a38bf762d0";>2e08c32
 Prep release 4.0.13 (https://github-redirect.dependabot.com/spulec/moto/issues/5813";>#5813)
   https://github.com/getmoto/moto/commit/d68fb04ee1fdf33c485b00fb40500406ac32d857";>d68fb04
 IOTData: Fix bug where publish() could only be called once (https://github-redirect.dependabot.com/spulec/moto/issues/5812";>#5812)
   https://github.com/getmoto/moto/commit/031f89dee00c6a5e01ac41c430d701508988f146";>031f89d
 Implement secretsmanager CancelRotateSecret (https://github-redirect.dependabot.com/spulec/moto/issues/5809";>#5809)
   https://github.com/getmoto/moto/commit/d89e4d236ceaa5eda2c915096c0ae5d989f4b1b2";>d89e4d2
 docs: fix patching external client/resource docs (https://github-redirect.dependabot.com/spulec/moto/issues/5804";>#5804)
   https://github.com/getmoto/moto/commit/555928af5d56b33326d0b4309c175065e3f422ae";>555928a
 IAM: add tags for users to get-account-authorization-details response (https://github-redirect.dependabot.com/spulec/moto/issues/5803";>#5803)
   https://github.com/getmoto/moto/commit/446930fcc0f3f3d5146e1ea970bf501cc263a069";>446930f
 CloudFormation: changing error message to match aws api (stack update 
attempt...
   https://github.com/getmoto/moto/commit/a53f620846b36316b832ba32924324c6182a3e9f";>a53f620
 IoT: Generate valid keys and certificates (https://github-redirect.dependabot.com/spulec/moto/issues/5801";>#5801)
   https://github.com/getmoto/moto/commit/f5fbddec86ecd78fe5c1945c1c9d2d6bf5de44b8";>f5fbdde
 chore(readme): Align code snippets formatting with project's rules. (https://github-redirect.dependabot.com/spulec/moto/issues/5802";>#5802)
   https://github.com/getmoto/moto/commit/5920d1a8ce7c17fc4190c1cd32670e89776107fe";>5920d1a
 Techdebt: MyPy DynamoDB v20111205 (https://github-redirect.dependabot.com/spulec/moto/issues/5799";>#5799)
   https://github.com/getmoto/moto/commit/fb0a4d64c87a2a4e6e7008701d79f7db55e060cc";>fb0a4d6
 EC2: Implement GetPasswordData (https://github-redirect.dependabot.com/spulec/moto/issues/5798";>#5798)
   Additional commits viewable in https://github.com/spulec/moto/compare/4.0.12...4.0.13";>compare 
view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=moto&package-manager=pip&previous-version=4.0.12&new-version=4.0.13)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yours

[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6544: Build: Bump rich from 13.0.0 to 13.0.1 in /python

2023-01-07 Thread GitBox


dependabot[bot] opened a new pull request, #6544:
URL: https://github.com/apache/iceberg/pull/6544

   Bumps [rich](https://github.com/Textualize/rich) from 13.0.0 to 13.0.1.
   
   Release notes
   Sourced from https://github.com/Textualize/rich/releases";>rich's releases.
   
   Fix for splitting segments
   Fix for an issue where Segment.split_cells produced the wrong result.
   Mostly a hotfix for the benefit of Textual.
   [13.0.1] - 2023-01-06
   Fixed
   
   Fixed issue with Segment.split_cells for mixed single and double cell 
widths
   
   
   
   
   Changelog
   Sourced from https://github.com/Textualize/rich/blob/master/CHANGELOG.md";>rich's 
changelog.
   
   [13.0.1] - 2023-01-06
   Fixed
   
   Fixed issue with Segment.split_cells for mixed single and double cell 
widths
   
   
   
   
   Commits
   
   https://github.com/Textualize/rich/commit/87529ad403630d6b7d1047179a5e62341fd75b3c";>87529ad
 version bump
   https://github.com/Textualize/rich/commit/4bc4437d1b929ee2a0f663be236940435eee31c9";>4bc4437
 Merge pull request https://github-redirect.dependabot.com/Textualize/rich/issues/2736";>#2736
 from Textualize/split-fix
   https://github.com/Textualize/rich/commit/aa0929298bf85b9357edf8af239d715a18ab5ce8";>aa09292
 fix issue splitting segments
   See full diff in https://github.com/Textualize/rich/compare/v13.0.0...v13.0.1";>compare 
view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=rich&package-manager=pip&previous-version=13.0.0&new-version=13.0.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dependabot[bot] opened a new pull request, #6545: Build: Bump coverage from 7.0.1 to 7.0.4 in /python

2023-01-07 Thread GitBox


dependabot[bot] opened a new pull request, #6545:
URL: https://github.com/apache/iceberg/pull/6545

   Bumps [coverage](https://github.com/nedbat/coveragepy) from 7.0.1 to 7.0.4.
   
   Changelog
   Sourced from https://github.com/nedbat/coveragepy/blob/master/CHANGES.rst";>coverage's 
changelog.
   
   Version 7.0.4 — 2023-01-07
   
   Performance: an internal cache of file names was accidentally disabled,
   resulting in sometimes drastic reductions in performance.  This is now fixed,
   closing issue 1527_.   Thanks to Ivan Ciuvalschii for the 
reproducible test
   case.
   
   .. _issue 1527: https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1527";>nedbat/coveragepy#1527
   .. _changes_7-0-3:
   Version 7.0.3 — 2023-01-03
   
   Fix: when using pytest-cov or pytest-xdist, or perhaps both, the 
combining
   step could fail with assert row is not None using 7.0.2.  This 
was due to
   a race condition that has always been possible and is still possible. In
   7.0.1 and before, the error was silently swallowed by the combining code.
   Now it will produce a message "Couldn't combine data file" and 
ignore the
   data file as it used to do before 7.0.2.  Closes issue 
1522_.
   
   .. _issue 1522: https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1522";>nedbat/coveragepy#1522
   .. _changes_7-0-2:
   Version 7.0.2 — 2023-01-02
   
   
   Fix: when using the [run] relative_files = True setting, a 
relative
   [paths] pattern was still being made absolute.  This is now 
fixed,
   closing issue 1519_.
   
   
   Fix: if Python doesn't provide tomllib, then TOML configuration files can
   only be read if coverage.py is installed with the [toml] extra.
   Coverage.py will raise an error if TOML support is not installed when it sees
   your settings are in a .toml file. But it didn't understand that
   [tools.coverage] was a valid section header, so the error 
wasn't reported
   if you used that header, and settings were silently ignored.  This is now
   fixed, closing issue 1516_.
   
   
   Fix: adjusted how decorators are traced on PyPy 7.3.10, fixing 
issue 1515_.
   
   
   Fix: the coverage lcov report did not properly implement the
   --fail-under=MIN option.  This has been fixed.
   
   
   Refactor: added many type annotations, including a number of refactorings.
   This should not affect outward behavior, but they were a bit invasive in 
some
   
   
   
   
   ... (truncated)
   
   
   Commits
   
   https://github.com/nedbat/coveragepy/commit/f4c27c7888b17e73c0d52ff7bfb8da505f532819";>f4c27c7
 docs: sample html report for 7.0.4
   https://github.com/nedbat/coveragepy/commit/2b95dba9bfdf2223b42d07536591c38833a37137";>2b95dba
 docs: prep for 7.0.4
   https://github.com/nedbat/coveragepy/commit/61ccfb8d314f0e11a3a4ee927f2024ae9cc309e0";>61ccfb8
 test(benchmark): more reasonable numeric displays
   https://github.com/nedbat/coveragepy/commit/2fa45d693de8861fcf84ee9cec85d6ab46203ac6";>2fa45d6
 refactor(benchmark): move benchmark.py to its own directory
   https://github.com/nedbat/coveragepy/commit/4d6ac8bb1714f9f7a655f64ed31e33e134fc961f";>4d6ac8b
 test(perf): randomize the order of benchmark runs
   https://github.com/nedbat/coveragepy/commit/405fae882f51faa356aaf8b5b3d838ee67db432d";>405fae8
 build: add .git-blame-ignore-revs
   https://github.com/nedbat/coveragepy/commit/9554e50a79dd05385178aa9f781c6ecb5b6ad45d";>9554e50
 style(perf): blacken lab/benchmark.py
   https://github.com/nedbat/coveragepy/commit/dc4f0c1f82731250d0eb2a463b7867457e97c675";>dc4f0c1
 test(perf): more experiments for https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1527";>#1527
   https://github.com/nedbat/coveragepy/commit/b3b05bdfdd20ab14004396cfcceeabe23e61cb41";>b3b05bd
 perf: the file mapping cache was off by mistake. https://github-redirect.dependabot.com/nedbat/coveragepy/issues/1527";>#1527
   https://github.com/nedbat/coveragepy/commit/d00f1dd24960db7a4bbcb9b1cfd10646dfb2c850";>d00f1dd
 mypy: debug.py
   Additional commits viewable in https://github.com/nedbat/coveragepy/compare/7.0.1...7.0.4";>compare 
view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=coverage&package-manager=pip&previous-version=7.0.1&new-version=7.0.4)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@depe

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064064551


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/** MapDataStatistics uses map to count key frequency */
+class MapDataStatistics implements DataStatistics {
+  private final Map dataStatistics = Maps.newHashMap();
+
+  @Override
+  public long size() {
+return dataStatistics.size();
+  }
+
+  @Override
+  public void put(K key) {

Review Comment:
   from the implementation, I see `put` is probably not the most intuitive 
method name. For map count, this operation is more like `increment(K key)`. Not 
sure if `increment` will be meaningful to digest/probabilistic data structures. 
Alternative would be `add(K key)` or `add(K key, int count)`. 
   
   cc @pvary 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080560


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.

Review Comment:
   nit: we can make the description a little more accurate. e.g.
   
   ```
   Shuffle operator collects traffic distribution statistics. A custom 
partitioner shall be attached to the shuffle operator
   output. The custom partitioner leverages the statistics to shuffle record to 
improve data clustering while 
   maintaining relative balanced traffic distribution to downstream subtasks.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080676


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+class ShuffleOperator extends 
AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record
+  // value is the times key occurs
+  // TODO: support to store statistics for high cardinality cases
+  private transient DataStatistics localDataStatistics;
+  private transient DataStatistics globalDataStatistics;
+  private transient ListState> globalDataStatisticsState;
+  private transient DataStatisticsFactory dataStatisticsFactory;
+
+  public ShuffleOperator(
+  KeySelector keySelector,
+  OperatorEventGateway operatorEventGateway,
+  DataStatisticsFactory dataStatisticsFactory) {
+this.keySelector = keySelector;
+this.operatorEventGateway = operatorEventGateway;
+this.dataStatisticsFactory = dataStatisticsFactory;
+  }
+
+  @VisibleForTesting
+  ListStateDescriptor> 
generateGlobalDataDistributionWeightDescriptor() {

Review Comment:
   not sure if this method is necessary for code reuse. not sure why it is 
necessary to check it in testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080711


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+class ShuffleOperator extends 
AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record

Review Comment:
   nit: these comments seem not relevant anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064080833


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+class ShuffleOperator extends 
AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record
+  // value is the times key occurs
+  // TODO: support to store statistics for high cardinality cases
+  private transient DataStatistics localDataStatistics;

Review Comment:
   nit: simplify the names a little by removing `Data`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064081012


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+class ShuffleOperator extends 
AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record
+  // value is the times key occurs
+  // TODO: support to store statistics for high cardinality cases
+  private transient DataStatistics localDataStatistics;
+  private transient DataStatistics globalDataStatistics;
+  private transient ListState> globalDataStatisticsState;
+  private transient DataStatisticsFactory dataStatisticsFactory;
+
+  public ShuffleOperator(
+  KeySelector keySelector,
+  OperatorEventGateway operatorEventGateway,
+  DataStatisticsFactory dataStatisticsFactory) {
+this.keySelector = keySelector;
+this.operatorEventGateway = operatorEventGateway;
+this.dataStatisticsFactory = dataStatisticsFactory;
+  }
+
+  @VisibleForTesting
+  ListStateDescriptor> 
generateGlobalDataDistributionWeightDescriptor() {
+return new ListStateDescriptor<>(
+"globalDataStatisticsState", TypeInformation.of(new 
TypeHint>() {}));
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+localDataStatistics = dataStatisticsFactory.createDataStatistics();
+globalDataStatisticsState =

Review Comment:
   we need to check if context is restored
   ```
   if (context.isRestored()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r106408


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+class ShuffleOperator extends 
AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record
+  // value is the times key occurs
+  // TODO: support to store statistics for high cardinality cases
+  private transient DataStatistics localDataStatistics;
+  private transient DataStatistics globalDataStatistics;
+  private transient ListState> globalDataStatisticsState;
+  private transient DataStatisticsFactory dataStatisticsFactory;
+
+  public ShuffleOperator(
+  KeySelector keySelector,
+  OperatorEventGateway operatorEventGateway,
+  DataStatisticsFactory dataStatisticsFactory) {
+this.keySelector = keySelector;
+this.operatorEventGateway = operatorEventGateway;
+this.dataStatisticsFactory = dataStatisticsFactory;
+  }
+
+  @VisibleForTesting
+  ListStateDescriptor> 
generateGlobalDataDistributionWeightDescriptor() {
+return new ListStateDescriptor<>(
+"globalDataStatisticsState", TypeInformation.of(new 
TypeHint>() {}));
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+localDataStatistics = dataStatisticsFactory.createDataStatistics();
+globalDataStatisticsState =
+context
+.getOperatorStateStore()
+.getListState(generateGlobalDataDistributionWeightDescriptor());
+
+if (globalDataStatisticsState.get() != null
+&& globalDataStatisticsState.get().iterator().hasNext()) {
+  globalDataStatistics = globalDataStatisticsState.get().iterator().next();
+} else {
+  globalDataStatistics = dataStatisticsFactory.createDataStatistics();

Review Comment:
   this is a cold start problem (no state, rescale, etc.). does it make sense 
to create an empty Statistics object or we should keep it null to indicate cold 
start problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083596


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+class ShuffleOperator extends 
AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record
+  // value is the times key occurs
+  // TODO: support to store statistics for high cardinality cases
+  private transient DataStatistics localDataStatistics;
+  private transient DataStatistics globalDataStatistics;
+  private transient ListState> globalDataStatisticsState;
+  private transient DataStatisticsFactory dataStatisticsFactory;
+
+  public ShuffleOperator(
+  KeySelector keySelector,
+  OperatorEventGateway operatorEventGateway,
+  DataStatisticsFactory dataStatisticsFactory) {
+this.keySelector = keySelector;
+this.operatorEventGateway = operatorEventGateway;
+this.dataStatisticsFactory = dataStatisticsFactory;
+  }
+
+  @VisibleForTesting
+  ListStateDescriptor> 
generateGlobalDataDistributionWeightDescriptor() {
+return new ListStateDescriptor<>(
+"globalDataStatisticsState", TypeInformation.of(new 
TypeHint>() {}));
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+localDataStatistics = dataStatisticsFactory.createDataStatistics();
+globalDataStatisticsState =
+context
+.getOperatorStateStore()
+.getListState(generateGlobalDataDistributionWeightDescriptor());
+
+if (globalDataStatisticsState.get() != null
+&& globalDataStatisticsState.get().iterator().hasNext()) {
+  globalDataStatistics = globalDataStatisticsState.get().iterator().next();
+} else {
+  globalDataStatistics = dataStatisticsFactory.createDataStatistics();
+}
+  }
+
+  @Override
+  public void open() throws Exception {
+// TODO: handle scaling up
+if (globalDataStatistics != null && globalDataStatistics.size() > 0) {
+  output.collect(new 
StreamRecord<>(ShuffleRecordWrapper.fromStatistics(globalDataStatistics)));
+}
+  }
+
+  @Override
+  public void handleOperatorEvent(OperatorEvent evt) {
+// TODO: receive event with globalDataDistributionWeight from coordinator 
and update
+// globalDataStatistics
+  }
+
+  @Override
+  public void processElement(StreamRecord streamRecord) throws Exception {
+final K key = keySelector.getKey(streamRecord.getValue());
+localDataStatistics.put(key);
+output.collect(new 
StreamRecord<>(Shuf

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083656


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * The wrapper class for record and data distribution weight

Review Comment:
   we should explain why this wrapper class it needed. This is the only way for 
the shuffle operator to pass the statistics to the custom partitioner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083753


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * The wrapper class for record and data distribution weight
+ *
+ * ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may 
contain a record or

Review Comment:
   > It may contain a record or data distribution weight.
   
   we can be more precise here. `It contains either a record or the data 
statistics (globally aggregated).`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083871


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * The wrapper class for record and data distribution weight
+ *
+ * ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may 
contain a record or
+ * data distribution weight. Once partitioner receives the weight, it will use 
that to decide the
+ * coming record should send to which writer subtask. After shuffling, a 
filter and mapper are
+ * required to filter out the data distribution weight, unwrap the object and 
extract the original
+ * record type T.
+ */
+public class ShuffleRecordWrapper implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final DataStatistics globalDataStatistics;
+  private final T record;
+
+  private ShuffleRecordWrapper(T record, DataStatistics 
globalDataStatistics) {
+Preconditions.checkArgument(
+record != null ^ globalDataStatistics != null,
+"A ShuffleRecordWrapper contain either record and stats, not neither 
or both");
+this.globalDataStatistics = globalDataStatistics;
+this.record = record;
+  }
+
+  static  ShuffleRecordWrapper fromRecord(T record) {
+return new ShuffleRecordWrapper<>(record, null);
+  }
+
+  static  ShuffleRecordWrapper fromStatistics(DataStatistics 
globalDataStatistics) {
+return new ShuffleRecordWrapper<>(null, globalDataStatistics);
+  }
+
+  boolean hasGlobalDataDistributionWeight() {

Review Comment:
   let's be consistent here. by the variable name, the method name could be 
`hasDataStatistics`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064083893


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * The wrapper class for record and data distribution weight
+ *
+ * ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may 
contain a record or
+ * data distribution weight. Once partitioner receives the weight, it will use 
that to decide the
+ * coming record should send to which writer subtask. After shuffling, a 
filter and mapper are
+ * required to filter out the data distribution weight, unwrap the object and 
extract the original
+ * record type T.
+ */
+public class ShuffleRecordWrapper implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final DataStatistics globalDataStatistics;
+  private final T record;
+
+  private ShuffleRecordWrapper(T record, DataStatistics 
globalDataStatistics) {
+Preconditions.checkArgument(
+record != null ^ globalDataStatistics != null,
+"A ShuffleRecordWrapper contain either record and stats, not neither 
or both");
+this.globalDataStatistics = globalDataStatistics;
+this.record = record;
+  }
+
+  static  ShuffleRecordWrapper fromRecord(T record) {
+return new ShuffleRecordWrapper<>(record, null);
+  }
+
+  static  ShuffleRecordWrapper fromStatistics(DataStatistics 
globalDataStatistics) {
+return new ShuffleRecordWrapper<>(null, globalDataStatistics);
+  }
+
+  boolean hasGlobalDataDistributionWeight() {
+return globalDataStatistics != null;
+  }
+
+  boolean hasRecord() {
+return record != null;
+  }
+
+  DataStatistics globalDataDistributionWeight() {

Review Comment:
   nit: maybe just `dataStatistics`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064084354


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestShuffleOperator {
+  private ShuffleOperator operator;
+
+  private Environment getTestingEnvironment() {
+return new StreamMockEnvironment(
+new Configuration(),
+new Configuration(),
+new ExecutionConfig(),
+1L,
+new MockInputSplitProvider(),
+1,
+new TestTaskStateManager());
+  }
+
+  @Before
+  public void before() throws Exception {
+MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
+KeySelector keySelector =
+new KeySelector() {
+  private static final long serialVersionUID = 7662520075515707428L;
+
+  @Override
+  public String getKey(String value) {
+return value;
+  }
+};
+
+this.operator =
+new ShuffleOperator<>(keySelector, TypeInformation.of(String.class), 
mockGateway);
+Environment env = getTestingEnvironment();
+this.operator.setup(
+new OneInputStreamTask(env),
+new MockStreamConfig(new Configuration(), 1),
+new MockOutput<>(Lists.newArrayList()));
+  }
+
+  @After
+  public void clean() throws Exception {
+operator.close();
+  }
+
+  @Test
+  public void testInitializeState() throws Exception {

Review Comment:
   not sure if this test is necessary. seems too trivial to test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2023-01-07 Thread GitBox


stevenzwu commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1064084432


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestShuffleOperator {
+  private ShuffleOperator operator;
+
+  private Environment getTestingEnvironment() {
+return new StreamMockEnvironment(
+new Configuration(),
+new Configuration(),
+new ExecutionConfig(),
+1L,
+new MockInputSplitProvider(),
+1,
+new TestTaskStateManager());
+  }
+
+  @Before
+  public void before() throws Exception {
+MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
+KeySelector keySelector =
+new KeySelector() {
+  private static final long serialVersionUID = 7662520075515707428L;
+
+  @Override
+  public String getKey(String value) {
+return value;
+  }
+};
+DataStatisticsFactory dataStatisticsFactory = new 
DataStatisticsFactory<>();
+
+this.operator = new ShuffleOperator<>(keySelector, mockGateway, 
dataStatisticsFactory);
+Environment env = getTestingEnvironment();
+this.operator.setup(
+new OneInputStreamTask(env),
+new MockStreamConfig(new Configuration(), 1),
+new MockOutput<>(Lists.newArrayList()));
+  }
+
+  @After
+  public void clean() throws Exception {
+operator.close();
+  }
+
+  @Test
+  public void testInitializeState() throws Exception {
+StateInitializationContext stateContext = getStateContext();
+operator.initializeState(stateContext);
+
+assertNotNull(
+stateContext
+.getOperatorStateStore()
+
.getListState(operator.generateGlobalDataDistributionWeightDescriptor()));
+  }
+
+  @Test
+  public void testProcessElement() throws Exception {
+StateInitializationContext stateContext = getStateContext();
+operator.initializeState(stateContext);
+operator.processElement(new StreamRecord<>("a"));
+operator.processElement(new StreamRecord<>("a"));
+operator.processElement(new StreamRecord<>("b"));
+assertTrue(operator.localDataStatistics() instanceo

[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6449: WIP: Delta: Adding support for Snapshot Delta Lake Table to Iceberg Table

2023-01-07 Thread GitBox


jackye1995 commented on code in PR #6449:
URL: https://github.com/apache/iceberg/pull/6449#discussion_r1064097728


##
build.gradle:
##
@@ -438,6 +442,70 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-delta-lake') {
+  configurations {
+integrationImplementation.extendsFrom testImplementation

Review Comment:
   Probably should add some comments about why integration test is used here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org