Re: [I] Caused by: java.net.SocketException: Connection reset [iceberg]

2024-01-17 Thread via GitHub


pvary commented on issue #9444:
URL: https://github.com/apache/iceberg/issues/9444#issuecomment-1895290384

   > Because it can also be an intermittent network issue for anyone using 
Iceberg with Flink and failing the entire stream for that sounds a bit harsh.
   
   For the record, the fix you are suggesting is in the Iceberg AWS code, not 
in the Flink Iceberg connector code 😄 
   
   If we do retries in Iceberg we use `Tasks` for retries, like:
   
https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java#L175


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add formatting for toml files [iceberg-rust]

2024-01-17 Thread via GitHub


liurenjie1024 commented on code in PR #167:
URL: https://github.com/apache/iceberg-rust/pull/167#discussion_r1454897761


##
Makefile:
##
@@ -32,7 +32,11 @@ cargo-sort:
cargo install cargo-sort
cargo sort -c -w
 
-check: check-fmt check-clippy cargo-sort
+fmt-toml:
+   cargo install taplo-cli --locked
+   taplo fmt

Review Comment:
   > I'm guessing not. It will fmt the files directly if it used to be 
incorrectly formatted.
   
   Than I think it's better to use `check` here. The goal of ci is to check the 
correctness, not do formatting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Caused by: java.net.SocketException: Connection reset [iceberg]

2024-01-17 Thread via GitHub


javrasya commented on issue #9444:
URL: https://github.com/apache/iceberg/issues/9444#issuecomment-1895303601

   True, the same file Io can be used for Spark too, I forgot that :-) 
   Exactly I saw that implementation for retries in the source code. Thanks for 
sharing it, it is a good reference. I just needed a very simple retry logic in 
place to see if it really helps. I think it should be implemented in that way 
instead. 
   
   Who do you think can be pulled in the discussions here for that? I know you 
guys are looking on the Flink side of Iceberg mostly. Not really sure how the 
contributors are grouped. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] init writer framework [iceberg-rust]

2024-01-17 Thread via GitHub


ZENOTME commented on PR #135:
URL: https://github.com/apache/iceberg-rust/pull/135#issuecomment-1895304460

   I feel that for this writer framework, we may need more discussion, so I can 
separate this framework as `IcebergWriter` and `FileWriter` parts. 
   The `IcebergWriter` part is about how to organize our writers, it's the core 
and more complicated.
   The `FileWriter` part is an abstraction for kinds of file formats like 
`parquet`, and `orc`. It's more simple. 
   
   And we can work on the `FileWriter` part first if it looks good. How do you 
think? @Xuanwo @Fokko 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files [iceberg]

2024-01-17 Thread via GitHub


pvary commented on PR #9464:
URL: https://github.com/apache/iceberg/pull/9464#issuecomment-1895305741

   > The split/FileScanTask should only contain ASCII.
   
   If I understand correctly, the `FileScanTask` json will contain the 
`Schema`. The `Schema` has a `doc` field for comments. Do we have restrictions 
defined for the `doc` field? When working on Hive, plenty of our Chinese users 
added comments using non-ASCII characters there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Hive: Refactor hive-table commit operation to be used for other operations like view [iceberg]

2024-01-17 Thread via GitHub


nk1506 commented on code in PR #9461:
URL: https://github.com/apache/iceberg/pull/9461#discussion_r1454918307


##
core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java:
##
@@ -309,65 +300,19 @@ protected enum CommitStatus {
* @return Commit Status of Success, Failure or Unknown
*/
   protected CommitStatus checkCommitStatus(String newMetadataLocation, 
TableMetadata config) {
-int maxAttempts =

Review Comment:
   This needed to move to a common place so that other operations like View can 
also use it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] init writer framework [iceberg-rust]

2024-01-17 Thread via GitHub


Xuanwo commented on PR #135:
URL: https://github.com/apache/iceberg-rust/pull/135#issuecomment-1895313954

   > And we can work on the `FileWriter` part first if it looks good. How do 
you think? @Xuanwo @Fokko
   
   LGTM! It's good to merge things in small chunks and polish them during the 
real usage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1454928120


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.CommandExecutionMode
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  originalText: String,
+  query: LogicalPlan,
+  userSpecifiedColumns: Seq[(String, Option[String])],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
+qe.assertAnalyzed()
+val analyzedPlan = qe.analyzed
+
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+
+if (userSpecifiedColumns.nonEmpty) {
+  if (userSpecifiedColumns.length > analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  } else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  }
+}
+
+val queryColumnNames = analyzedPlan.schema.fieldNames
+SchemaUtils.checkColumnNameDuplication(queryColumnNames, 
SQLConf.get.resolver)
+
+val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema
+val columnAliases = userSpecifiedColumns.map(_._1).toArray
+val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray

Review Comment:
   I agree that this should be null. The issue here was that Spark was failing 
with a NPE when running `TestViews#createViewWithColumnAliases()` in an earlier 
version of this PR when that particular code was living in `ResolveViews`.
   I checked again and this doesn't fail anymore, because the logic moved into 
`CreateV2ViewExec`. I've updated this to `null` now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1454951834


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.CommandExecutionMode
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  originalText: String,
+  query: LogicalPlan,
+  userSpecifiedColumns: Seq[(String, Option[String])],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
+qe.assertAnalyzed()
+val analyzedPlan = qe.analyzed
+
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+
+if (userSpecifiedColumns.nonEmpty) {
+  if (userSpecifiedColumns.length > analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  } else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  }
+}
+
+val queryColumnNames = analyzedPlan.schema.fieldNames
+SchemaUtils.checkColumnNameDuplication(queryColumnNames, 
SQLConf.get.resolver)
+
+val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema
+val columnAliases = userSpecifiedColumns.map(_._1).toArray
+val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray
+
+val currentCatalog = 
session.sessionState.catalogManager.currentCatalog.name
+val currentNamespace = session.sessionState.catalogManager.currentNamespace
+
+val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
+val createEngineVersion = Some(engineVersion)
+val newProperties = properties ++
+  comment.map(ViewCatalog.PROP_COMMENT -> _) ++
+  createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) +

Review Comment:
   no particular reason, I've updated this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[I] `write.parquet.compression-codec` being set even if file-format is not parquet [iceberg]

2024-01-17 Thread via GitHub


oneonestar opened a new issue, #9490:
URL: https://github.com/apache/iceberg/issues/9490

   ### Apache Iceberg version
   
   1.4.2 (latest release)
   
   ### Query engine
   
   Trino
   
   ### Please describe the bug 🐞
   
   In Trino 436 (Iceberg 1.4.3), `write.parquet.compression-codec` property is 
also being set even if the file-format is not parquet. 
(https://github.com/trinodb/trino/issues/20401)
   
   I think the problem could be related to
   https://github.com/apache/iceberg/pull/8593#issuecomment-1740507634
   
   ```
   trino> CREATE TABLE test.property_test (c1 integer) WITH (format = 'ORC');
   CREATE TABLE
   trino> SELECT * FROM test."property_test$properties";
  key   | value
   -+---
write.format.default| ORC
write.parquet.compression-codec | zstd
   (2 rows)
   
   trino> CREATE TABLE test.property_test (c1 integer) WITH (format = 'AVRO');
   CREATE TABLE
   trino> SELECT * FROM test."property_test$properties";
  key   | value
   -+---
write.format.default| AVRO
write.parquet.compression-codec | zstd
   (2 rows)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1454958750


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.CommandExecutionMode
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  originalText: String,
+  query: LogicalPlan,
+  userSpecifiedColumns: Seq[(String, Option[String])],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
+qe.assertAnalyzed()

Review Comment:
   this was taken from an early impl and is probably not needed anymore. I've 
removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Caused by: java.net.SocketException: Connection reset [iceberg]

2024-01-17 Thread via GitHub


pvary commented on issue #9444:
URL: https://github.com/apache/iceberg/issues/9444#issuecomment-1895346429

   Maybe @jackye1995?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] init writer framework [iceberg-rust]

2024-01-17 Thread via GitHub


liurenjie1024 commented on PR #135:
URL: https://github.com/apache/iceberg-rust/pull/135#issuecomment-1895352135

   > And we can work on the FileWriter part first if it looks good. How do you 
think? @Xuanwo @Fokko
   
   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Hive: Unwrap RuntimeException for Hive TException with rename table [iceberg]

2024-01-17 Thread via GitHub


pvary commented on code in PR #9432:
URL: https://github.com/apache/iceberg/pull/9432#discussion_r1454984558


##
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java:
##
@@ -251,12 +251,14 @@ public void renameTable(TableIdentifier from, 
TableIdentifier originalTo) {
 } catch (NoSuchObjectException e) {
   throw new NoSuchTableException("Table does not exist: %s", from);
 
-} catch (AlreadyExistsException e) {
-  throw new org.apache.iceberg.exceptions.AlreadyExistsException(
-  "Table already exists: %s", to);
-
 } catch (TException e) {
-  throw new RuntimeException("Failed to rename " + from + " to " + to, e);
+  if (e.getMessage() != null
+  && e.getMessage().contains(String.format("new table %s already 
exists", to))) {
+throw new org.apache.iceberg.exceptions.AlreadyExistsException(
+"Table already exists: %s", to);
+  } else {
+throw new RuntimeException("Failed to rename " + from + " to " + to, 
e);
+  }

Review Comment:
   could we please move this under a `catch (InvalidOperationException)`?
   But keep the old `TException` handler part



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] `write.parquet.compression-codec` being set even if file-format is not parquet [iceberg]

2024-01-17 Thread via GitHub


findinpath commented on issue #9490:
URL: https://github.com/apache/iceberg/issues/9490#issuecomment-1895363913

   cc @aokolnychyi  pls see 
`org.apache.iceberg.TableMetadata#persistedProperties` in  2e291c2b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455012760


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.CommandExecutionMode
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  originalText: String,
+  query: LogicalPlan,
+  userSpecifiedColumns: Seq[(String, Option[String])],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
+qe.assertAnalyzed()
+val analyzedPlan = qe.analyzed
+
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+
+if (userSpecifiedColumns.nonEmpty) {
+  if (userSpecifiedColumns.length > analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  } else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  }
+}
+
+val queryColumnNames = analyzedPlan.schema.fieldNames
+SchemaUtils.checkColumnNameDuplication(queryColumnNames, 
SQLConf.get.resolver)

Review Comment:
   I think we want to perform this check, because Spark does the same and fails 
if you provide duplicate columns in the underlying query:
   
   ```
   spark-sql (default)> create temporary view tempv2 as select id, id from 
iceberg1.foo WHERE id < 12;
   [COLUMN_ALREADY_EXISTS] The column `id` already exists. Consider to choose 
another name or rename the existing column.
   ```
   
   I've left that check and also added a test for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455041489


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.execution.command.ViewHelper
+
+object ViewCheck extends (LogicalPlan => Unit) {
+
+  override def apply(plan: LogicalPlan): Unit = {
+plan foreach {
+  case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, 
_, _, query, _, _) =>
+val identifier = Spark3Util.toV1TableIdentifier(ident)

Review Comment:
   we particularly test for this case via `createViewReferencingTempView()` and 
`createViewReferencingGlobalTempView()` and it seems to work. I've also checked 
and `CheckViews` runs right after the logic in `ResolveViews`. I think we could 
also just remove `CheckViews` and perform the check in `ResolveViews`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455090708


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.execution.command.ViewHelper
+
+object ViewCheck extends (LogicalPlan => Unit) {
+
+  override def apply(plan: LogicalPlan): Unit = {
+plan foreach {
+  case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, 
_, _, query, _, _) =>
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, 
Seq.empty)
+ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, 
identifier)

Review Comment:
   initially I added this because the same check is being done in Spark's view 
creation code in 
https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala#L115.
 
   
   Looking at 
https://github.com/apache/spark/commit/3c683434fa3f041000af363fdc6bdaddf4e1fb2a 
where this feature was introduced, I actually don't think we need this check, 
so I went ahead and removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455100866


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.execution.command.ViewHelper
+
+object ViewCheck extends (LogicalPlan => Unit) {
+
+  override def apply(plan: LogicalPlan): Unit = {
+plan foreach {
+  case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, 
_, _, query, _, _) =>
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, 
Seq.empty)

Review Comment:
   what the check does is to make sure that a view doesn't reference a 
temporary/global view. We particularly test for these 2 cases in 
`createViewReferencingGlobalTempView()` / `createViewReferencingTempView()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Check stale issues in ascending order [iceberg]

2024-01-17 Thread via GitHub


ajantha-bhat commented on code in PR #9489:
URL: https://github.com/apache/iceberg/pull/9489#discussion_r1455140931


##
.github/workflows/stale.yml:
##
@@ -47,3 +47,4 @@ jobs:
   close-issue-message: >
 This issue has been closed because it has not received any 
activity in the last 14 days
 since being marked as 'stale'
+  ascending: true

Review Comment:
   the workflow triggers daily at midnight, considering 30 issues a day. It 
should have closed  by now? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Hive: Unwrap RuntimeException for Hive TException with rename table [iceberg]

2024-01-17 Thread via GitHub


pvary merged PR #9432:
URL: https://github.com/apache/iceberg/pull/9432


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Hive: Unwrap RuntimeException for Hive TException with rename table [iceberg]

2024-01-17 Thread via GitHub


pvary commented on PR #9432:
URL: https://github.com/apache/iceberg/pull/9432#issuecomment-1895498122

   Thanks @nk1506 for the PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Update iceberg_bug_report.yml to 1.4.3 [iceberg]

2024-01-17 Thread via GitHub


nastra merged PR #9491:
URL: https://github.com/apache/iceberg/pull/9491


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Check stale issues in ascending order [iceberg]

2024-01-17 Thread via GitHub


manuzhang commented on code in PR #9489:
URL: https://github.com/apache/iceberg/pull/9489#discussion_r1455190604


##
.github/workflows/stale.yml:
##
@@ -47,3 +47,4 @@ jobs:
   close-issue-message: >
 This issue has been closed because it has not received any 
activity in the last 14 days
 since being marked as 'stale'
+  ascending: true

Review Comment:
   With the default descending order, it always checks the latest issues and 
only a small amount can be closed. Older issues are never checked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Check stale issues in ascending order [iceberg]

2024-01-17 Thread via GitHub


nastra merged PR #9489:
URL: https://github.com/apache/iceberg/pull/9489


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Add view support for JDBC catalog [iceberg]

2024-01-17 Thread via GitHub


ajantha-bhat commented on code in PR #9487:
URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455179512


##
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##
@@ -53,21 +58,19 @@ final class JdbcUtil {
   + " WHERE "
   + CATALOG_NAME
   + " = ? AND "
-  + TABLE_NAMESPACE
-  + " = ? AND "
-  + TABLE_NAME
-  + " = ? AND "
+  + " %s = ? AND " // table or view namespace

Review Comment:
   nit: we can remove the extra space before %s



##
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##
@@ -303,6 +331,263 @@ public static Properties 
filterAndRemovePrefix(Map properties, S
 return result;
   }
 
+  /**
+   * Create SQL statement to get a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String getTableOrViewSqlStatement(boolean table) {
+String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+return String.format(
+GET_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to update a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitSqlStatement(boolean table) {
+String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+return String.format(
+DO_COMMIT_TABLE_OR_VIEW_SQL, tableOrViewTableName, 
tableOrViewNamespace, tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String createCatalogTableOrViewSqlStatement(boolean table) {
+String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+return String.format(
+CREATE_CATALOG_TABLE_OR_VIEW,
+tableOrViewTableName,
+tableOrViewNamespace,
+tableOrViewName,
+tableOrViewNamespace,
+tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to list tables or views.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String listTablesOrViewsSqlStatement(boolean table) {
+String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+return String.format(LIST_TABLES_OR_VIEWS_SQL, tableOrViewTableName, 
tableOrViewNamespace);
+  }
+
+  /**
+   * Create SQL statement to rename a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String renameTableOrViewSqlStatement(boolean table) {
+String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+return String.format(
+RENAME_TABLE_OR_VIEW_SQL,
+tableOrViewTableName,
+tableOrViewNamespace,
+tableOrViewName,
+tableOrViewNamespace,
+tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to delete a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String dropTableOrViewSqlStatement(boolean table) {
+String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+return String.format(
+DROP_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitCreateTableOrViewSqlStatement(boolean table) {
+String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+return String.format(

Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455299682


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.CommandExecutionMode
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  originalText: String,
+  query: LogicalPlan,
+  userSpecifiedColumns: Seq[(String, Option[String])],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
+qe.assertAnalyzed()
+val analyzedPlan = qe.analyzed
+
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+
+if (userSpecifiedColumns.nonEmpty) {
+  if (userSpecifiedColumns.length > analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  } else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  }
+}
+
+val queryColumnNames = analyzedPlan.schema.fieldNames
+SchemaUtils.checkColumnNameDuplication(queryColumnNames, 
SQLConf.get.resolver)
+
+val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema
+val columnAliases = userSpecifiedColumns.map(_._1).toArray
+val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray
+
+val currentCatalog = 
session.sessionState.catalogManager.currentCatalog.name

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Close the MetricsReporter when the Catalog is closed. [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9353:
URL: https://github.com/apache/iceberg/pull/9353#discussion_r1455363550


##
core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java:
##
@@ -482,7 +489,13 @@ public boolean removeProperties(Namespace namespace, 
Set properties)
 
   @Override
   public void close() {
-connections.close();
+if (closeableGroup != null) {
+  try {

Review Comment:
   I don't think we need to wrap this in a try-catch block here and none of the 
other catalogs do, so I think we can remove this here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Close the MetricsReporter when the Catalog is closed. [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9353:
URL: https://github.com/apache/iceberg/pull/9353#discussion_r1455378169


##
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java:
##
@@ -143,6 +142,7 @@ void initialize(
 this.closeableGroup = new CloseableGroup();
 closeableGroup.addCloseable(dynamo);
 closeableGroup.addCloseable(fileIO);
+closeableGroup.addCloseable(metricsReporter());

Review Comment:
   I just wanted to point out that this effectively will initialize the 
reporter much earlier (during catalog instantiation) than previously (during 
table loading). This is probably ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Docs: Add distribution mode not respected for CTAS/RTAS before Spark 3.5.0 [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9439:
URL: https://github.com/apache/iceberg/pull/9439#discussion_r1455411644


##
docs/spark-writes.md:
##
@@ -343,7 +343,8 @@ 
data.writeTo("prod.db.sample").option("mergeSchema","true").append()
 Iceberg's default Spark writers require that the data in each spark task is 
clustered by partition values. This 
 distribution is required to minimize the number of file handles that are held 
open while writing. By default, starting
 in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written 
to fit this distribution. The
-request to Spark is done through the table property `write.distribution-mode` 
with the value `hash`.
+request to Spark is done through the table property `write.distribution-mode` 
with the value `hash`. Spark doesn't respect
+distribution mode in CTAS/RTAS before 3.5.0.

Review Comment:
   do we have tests that check the behavior in Spark 3.4 + Spark 3.5 on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Bump Spark 3.3 from 3.3.3 to 3.3.4 [iceberg]

2024-01-17 Thread via GitHub


nastra merged PR #9492:
URL: https://github.com/apache/iceberg/pull/9492


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]

2024-01-17 Thread via GitHub


Fokko commented on PR #265:
URL: https://github.com/apache/iceberg-python/pull/265#issuecomment-1895723645

   Thanks @syun64 for working on this 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]

2024-01-17 Thread via GitHub


Fokko merged PR #265:
URL: https://github.com/apache/iceberg-python/pull/265


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] SqlCatalog _commit_table() support [iceberg-python]

2024-01-17 Thread via GitHub


Fokko closed issue #262: SqlCatalog _commit_table() support
URL: https://github.com/apache/iceberg-python/issues/262


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] SqlCatalog _commit_table() support [iceberg-python]

2024-01-17 Thread via GitHub


Fokko commented on issue #262:
URL: https://github.com/apache/iceberg-python/issues/262#issuecomment-1895726712

   Has been fixed in #265 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Add view support for JDBC catalog [iceberg]

2024-01-17 Thread via GitHub


jbonofre commented on code in PR #9487:
URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455472169


##
core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java:
##
@@ -157,8 +159,10 @@ private void initializeCatalogTables() throws 
InterruptedException, SQLException
 return true;
   }
 
-  LOG.debug("Creating table {} to store iceberg catalog", 
JdbcUtil.CATALOG_TABLE_NAME);
-  return 
conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
+  LOG.debug(
+  "Creating table {} to store iceberg catalog tables", 
JdbcUtil.CATALOG_TABLE_NAME);
+  return 
conn.prepareStatement(JdbcUtil.createCatalogTableOrViewSqlStatement(true))

Review Comment:
   OK, I will do that (not sure these methods are actually used by end user 
though).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Add view support for JDBC catalog [iceberg]

2024-01-17 Thread via GitHub


jbonofre commented on code in PR #9487:
URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455473193


##
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##
@@ -53,21 +58,19 @@ final class JdbcUtil {
   + " WHERE "
   + CATALOG_NAME
   + " = ? AND "
-  + TABLE_NAMESPACE
-  + " = ? AND "
-  + TABLE_NAME
-  + " = ? AND "
+  + " %s = ? AND " // table or view namespace

Review Comment:
   Indeed, I will also clean the existing SQL statements (which also have extra 
spaces). Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Add view support for JDBC catalog [iceberg]

2024-01-17 Thread via GitHub


jbonofre commented on code in PR #9487:
URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455474276


##
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##
@@ -139,7 +134,22 @@ final class JdbcUtil {
   + " LIKE ? ESCAPE '\\' "
   + " ) "
   + " LIMIT 1";
-  static final String LIST_NAMESPACES_SQL =
+  static final String GET_VIEW_NAMESPACE_SQL =

Review Comment:
   Good point, I will use the same approach with SQL statement "generated" 
method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[I] Build/Release: Upgrade to Apache RAT 0.16 and scan hidden directories [iceberg]

2024-01-17 Thread via GitHub


jbonofre opened a new issue, #9494:
URL: https://github.com/apache/iceberg/issues/9494

   ### Feature Request / Improvement
   
   As identified on a previous Iceberg release, apache-rat 0.15 doesn't scan 
hidden directories. It's not good as the hidden directories are part of the 
released Iceberg source distribution.
   
   I added `--scan-hidden-directories` option to apache-rat 0.16 allowing us to 
scan all directories which are part of the source distribution.
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[PR] Build: Upgrade to Apache RAT 0.16, scanning hidden directories and adding missing ASF header [iceberg]

2024-01-17 Thread via GitHub


jbonofre opened a new pull request, #9495:
URL: https://github.com/apache/iceberg/pull/9495

   This PR does:
   - upgrade to Apache RAT 0.16
   - add `--scan-hidden-directories` option
   - add ASF header where missing
   - add new excluded file from RAT check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Upgrade to Apache RAT 0.16, scanning hidden directories and adding missing ASF header [iceberg]

2024-01-17 Thread via GitHub


jbonofre commented on PR #9495:
URL: https://github.com/apache/iceberg/pull/9495#issuecomment-1895818508

   @Fokko can you please take a look ? Thanks !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[PR] Docs: Fix typo in tag reading example [iceberg]

2024-01-17 Thread via GitHub


pvary opened a new pull request, #9496:
URL: https://github.com/apache/iceberg/pull/9496

   Small fix in the docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[PR] Add small test on duplicate changes [iceberg-python]

2024-01-17 Thread via GitHub


Fokko opened a new pull request, #273:
URL: https://github.com/apache/iceberg-python/pull/273

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Docs: Fix typo in tag reading example [iceberg]

2024-01-17 Thread via GitHub


nastra merged PR #9496:
URL: https://github.com/apache/iceberg/pull/9496


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Set `ghp_path` to `/` [iceberg]

2024-01-17 Thread via GitHub


Fokko merged PR #9493:
URL: https://github.com/apache/iceberg/pull/9493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


nastra commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455627217


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.execution.command.ViewHelper
+
+object ViewCheck extends (LogicalPlan => Unit) {
+
+  override def apply(plan: LogicalPlan): Unit = {
+plan foreach {
+  case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, 
_, _, query, _, _) =>

Review Comment:
   does your comment still apply after 
https://github.com/apache/iceberg/pull/9423#discussion_r1455100866?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Upgrade to Apache RAT 0.16, scanning hidden directories and adding missing ASF header [iceberg]

2024-01-17 Thread via GitHub


ajantha-bhat commented on code in PR #9495:
URL: https://github.com/apache/iceberg/pull/9495#discussion_r1455712834


##
dev/check-license:
##
@@ -68,7 +68,7 @@ mkdir -p "$FWDIR"/lib
 }
 
 mkdir -p build
-$java_cmd -jar "$rat_jar" -E "$FWDIR"/dev/.rat-excludes -d "$FWDIR" > 
build/rat-results.txt
+$java_cmd -jar "$rat_jar" --scan-hidden-directories -E 
"$FWDIR"/dev/.rat-excludes -d "$FWDIR" > build/rat-results.txt

Review Comment:
   Awesome contribution on adding `--scan-hidden-directories` options at the 
rat plugin side JB 👍 
   https://github.com/apache/creadur-rat/pull/166



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark 3.5: Spark action to compute the partition stats [iceberg]

2024-01-17 Thread via GitHub


ajantha-bhat commented on code in PR #9437:
URL: https://github.com/apache/iceberg/pull/9437#discussion_r1454359159


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##
@@ -150,6 +154,21 @@ protected Dataset contentFileDS(Table table, 
Set snapshotIds) {
 Broadcast tableBroadcast = 
sparkContext.broadcast(serializableTable);
 int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
 
+return manifestBeanDS(table, snapshotIds, numShufflePartitions)
+.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
+  }
+
+  protected Dataset partitionEntryDS(Table table) {
+Table serializableTable = SerializableTableWithSize.copyOf(table);
+Broadcast tableBroadcast = 
sparkContext.broadcast(serializableTable);
+int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
+
+return manifestBeanDS(table, null, numShufflePartitions)

Review Comment:
   > Is it actually correct? This code would go via ALL_MANIFESTS table. 
Shouldn't we only look for manifests in a particular snapshot for which we 
compute the stats?
   
   I just followed the same pattern from partitions metadata table which goes 
through all the manifests from all the snapshot.  
   
   
https://github.com/apache/iceberg/blob/31d18f51b9e8590f7ca316463b080bd1153e8f9e/core/src/main/java/org/apache/iceberg/PartitionsTable.java#L186-L195
   
   Let me think on this today and get back to you. Also, I need to understand 
why they are going through all manifests in partitions metadata table (cc: 
@szehon-ho) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add Hive integration tests [iceberg-python]

2024-01-17 Thread via GitHub


Fokko commented on code in PR #207:
URL: https://github.com/apache/iceberg-python/pull/207#discussion_r1455755578


##
tests/integration/test_hive.py:
##
@@ -0,0 +1,409 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import math
+import uuid
+from typing import Dict
+from urllib.parse import urlparse
+
+import pyarrow.parquet as pq
+import pytest
+from pyarrow.fs import S3FileSystem
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.expressions import (
+And,
+EqualTo,
+GreaterThanOrEqual,
+IsNaN,
+LessThan,
+NotEqualTo,
+NotNaN,
+)
+from pyiceberg.io.pyarrow import pyarrow_to_schema
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.types import (
+BooleanType,
+IntegerType,
+NestedField,
+StringType,
+TimestampType,
+)
+
+DEFAULT_PROPERTIES: Dict[str, str] = {}
+
+
+@pytest.fixture()
+def catalog() -> Catalog:
+return load_catalog(
+"local",
+**{
+"type": "hive",
+"uri": "http://localhost:9083";,
+"s3.endpoint": "http://localhost:9000";,
+"s3.access-key-id": "admin",
+"s3.secret-access-key": "password",
+},
+)
+
+
+@pytest.fixture()
+def table_test_null_nan(catalog: Catalog) -> Table:
+return catalog.load_table("default.test_null_nan")
+
+
+@pytest.fixture()
+def table_test_null_nan_rewritten(catalog: Catalog) -> Table:
+return catalog.load_table("default.test_null_nan_rewritten")
+
+
+@pytest.fixture()
+def table_test_limit(catalog: Catalog) -> Table:

Review Comment:
   I actually like this suggestion a lot, let me add this to the PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark 3.5: Spark action to compute the partition stats [iceberg]

2024-01-17 Thread via GitHub


ajantha-bhat commented on code in PR #9437:
URL: https://github.com/apache/iceberg/pull/9437#discussion_r1455763573


##
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##
@@ -150,6 +154,21 @@ protected Dataset contentFileDS(Table table, 
Set snapshotIds) {
 Broadcast tableBroadcast = 
sparkContext.broadcast(serializableTable);
 int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
 
+return manifestBeanDS(table, snapshotIds, numShufflePartitions)
+.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
+  }
+
+  protected Dataset partitionEntryDS(Table table) {
+Table serializableTable = SerializableTableWithSize.copyOf(table);
+Broadcast tableBroadcast = 
sparkContext.broadcast(serializableTable);
+int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
+
+return manifestBeanDS(table, null, numShufflePartitions)

Review Comment:
   > Is it actually correct? This code would go via ALL_MANIFESTS table. 
Shouldn't we only look for manifests in a particular snapshot for which we 
compute the stats?
   
   True. I got confused with `snapshot().allManifests()` to all manifest table. 
I need to change this. 
   
   And Thanks for detailed distributed and local algorithm. For my distributed 
algorithm, I faced problem with serialization of `partitionData`  (avro class 
issue) thats why I had to keep most of the logic at Driver. 
   
   I am not fully aware about how to implement the distributed algorithm that 
you have suggested. I will explore on that.
   
   In the mean time you can also review 
https://github.com/apache/iceberg/pull/9170 (which is independent and 
prerequisite for this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark 3.5: Spark action to compute the partition stats [iceberg]

2024-01-17 Thread via GitHub


ajantha-bhat commented on code in PR #9437:
URL: https://github.com/apache/iceberg/pull/9437#discussion_r1455766754


##
api/src/main/java/org/apache/iceberg/actions/ComputePartitionStats.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import org.apache.iceberg.PartitionStatisticsFile;
+
+/**
+ * An action to compute partition stats for the latest snapshot and registers 
it to the
+ * TableMetadata file
+ */
+public interface ComputePartitionStats

Review Comment:
   we should support it. Since stats is mapped to snapshot ID. It should be 
easy to work with branch and tags. I will handle this in a follow up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[I] Hive Catalog: Implement `_commit_table` [iceberg-python]

2024-01-17 Thread via GitHub


Fokko opened a new issue, #275:
URL: https://github.com/apache/iceberg-python/issues/275

   ### Feature Request / Improvement
   
   Probably very similar to the Glue/Sql one :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[I] Two-level parquet read EOF error: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current page. repetition lev

2024-01-17 Thread via GitHub


gaoshihang opened a new issue, #9497:
URL: https://github.com/apache/iceberg/issues/9497

   ### Apache Iceberg version
   
   1.4.3 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   We have a two-level parquet list, the schema like below:
   
![image](https://github.com/apache/iceberg/assets/20013931/ceddb1ba-76fc-4b0f-afa3-9fab3f01b583)
   
   Now if this array is an empty array: [], when we using add_files function 
add this parquet to a table, then query will throw this exception:
   ```
   Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value 
in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current 
page. repetition level: -1, definition level: -1
at 
org.apache.iceberg.parquet.PageIterator.handleRuntimeException(PageIterator.java:220)
at 
org.apache.iceberg.parquet.PageIterator.nextInteger(PageIterator.java:141)
at 
org.apache.iceberg.parquet.ColumnIterator.nextInteger(ColumnIterator.java:121)
at 
org.apache.iceberg.parquet.ColumnIterator$2.next(ColumnIterator.java:41)
at 
org.apache.iceberg.parquet.ColumnIterator$2.next(ColumnIterator.java:38)
at 
org.apache.iceberg.parquet.ParquetValueReaders$UnboxedReader.read(ParquetValueReaders.java:246)
at 
org.apache.iceberg.parquet.ParquetValueReaders$RepeatedReader.read(ParquetValueReaders.java:467)
at 
org.apache.iceberg.parquet.ParquetValueReaders$OptionReader.read(ParquetValueReaders.java:419)
at 
org.apache.iceberg.parquet.ParquetValueReaders$StructReader.read(ParquetValueReaders.java:745)
at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:130)
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65)
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:49)
at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:129)
at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93)
at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.parquet.io.ParquetDecodingException: could not read int
at 
org.apache.parquet.column.values.plain.PlainValuesReader$IntegerPlainValuesReader.readInteger(PlainValuesReader.java:114)
at 
org.apache.iceberg.parquet.PageIterator.nextInteger(PageIterator.java:139)
... 32 more
   Caused by: java.io.EOFException
at 
org.apache.parquet.bytes.SingleBufferInputStream.read(SingleBufferInputStream.java:52)
at 
org.apache.parquet.bytes.LittleEndianDataInputStream.readInt(LittleEndianDataInputStream.java:347)
at 
org.apache.parquet.column.values.plain.PlainValuesReader$IntegerPlainValuesReader.readInteger(PlainValuesReader.java:112)
... 33 more
   ```
   
   And I read the code in Iceberg-parquet, it seems like this do-while will 
never exist:
   
![image](https://github.com/apache/iceberg/assets/20013931/2e74bbc0-4daa-4570-a91b-717ffdc65324)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries a

Re: [PR] Add Hive integration tests [iceberg-python]

2024-01-17 Thread via GitHub


Fokko commented on PR #207:
URL: https://github.com/apache/iceberg-python/pull/207#issuecomment-1896052914

   Thanks @HonahX for the review 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add Hive integration tests [iceberg-python]

2024-01-17 Thread via GitHub


Fokko merged PR #207:
URL: https://github.com/apache/iceberg-python/pull/207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]

2024-01-17 Thread via GitHub


N-o-Z opened a new pull request, #9498:
URL: https://github.com/apache/iceberg/pull/9498

   Closes #9485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]

2024-01-17 Thread via GitHub


N-o-Z commented on PR #9498:
URL: https://github.com/apache/iceberg/pull/9498#issuecomment-1896152644

   @amogh-jahagirdar, FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Purge support for Iceberg view [iceberg]

2024-01-17 Thread via GitHub


rdblue commented on issue #9433:
URL: https://github.com/apache/iceberg/issues/9433#issuecomment-1896190645

   What is the proposed behavior for a purge operation? How does this apply to 
views?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files [iceberg]

2024-01-17 Thread via GitHub


stevenzwu commented on PR #9464:
URL: https://github.com/apache/iceberg/pull/9464#issuecomment-1896232286

   > If I understand correctly, the FileScanTask json will contain the Schema. 
The Schema has a doc field for comments. Do we have restrictions defined for 
the doc field? 
   
   @pvary you are right. agree that doc field may contain non ASCII chars.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]

2024-01-17 Thread via GitHub


amogh-jahagirdar commented on code in PR #9498:
URL: https://github.com/apache/iceberg/pull/9498#discussion_r1456088754


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -360,7 +360,10 @@ int findVersion() {
*/
   private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
 try {
-  lockManager.acquire(dst.toString(), src.toString());
+  boolean success = lockManager.acquire(dst.toString(), src.toString());
+  if (!success) {
+throw new CommitFailedException("Failed to acquire lock on file: %s", 
dst);
+  }

Review Comment:
   Style nit, if we could add a new line after this `if` block separating it 
from the next `if`. I know in this file it looks like we didn't really follow 
our normal practice already but at least for new changes we could follow it.
   
   Also I think I'd just inline the boolean so
   
   ```
   if (!lockManager.acquire(dst.toString(), src.toStrnig()) {
throw ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]

2024-01-17 Thread via GitHub


amogh-jahagirdar commented on code in PR #9498:
URL: https://github.com/apache/iceberg/pull/9498#discussion_r1456088754


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -360,7 +360,10 @@ int findVersion() {
*/
   private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
 try {
-  lockManager.acquire(dst.toString(), src.toString());
+  boolean success = lockManager.acquire(dst.toString(), src.toString());
+  if (!success) {
+throw new CommitFailedException("Failed to acquire lock on file: %s", 
dst);
+  }

Review Comment:
   Style nit, if we could add a new line after this `if` block separating it 
from the next `if`! I know in this file it looks like we didn't really follow 
our normal practice already but at least for new changes we could follow it.
   
   Also I think I'd just inline the boolean so
   
   ```
   if (!lockManager.acquire(dst.toString(), src.toStrnig()) {
throw ...
   }
   ```



##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -360,7 +360,10 @@ int findVersion() {
*/
   private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
 try {
-  lockManager.acquire(dst.toString(), src.toString());
+  boolean success = lockManager.acquire(dst.toString(), src.toString());
+  if (!success) {
+throw new CommitFailedException("Failed to acquire lock on file: %s", 
dst);
+  }

Review Comment:
   Also should we include the owner (src) in the exception as well?



##
core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java:
##
@@ -451,4 +455,39 @@ public void testConcurrentFastAppends(@TempDir File dir) 
throws Exception {
 Assertions.assertThat(Lists.newArrayList(tableWithHighRetries.snapshots()))
 .hasSize(threadsCount * numberOfCommitedFilesPerThread);
   }
-}
+
+  @Test
+  public void testCommitFailedToAcquire() {
+table.newFastAppend().appendFile(FILE_A).commit();
+Configuration conf = new Configuration();
+LockManager lockManager = new NoLockManager();
+HadoopTableOperations tops = 

Review Comment:
   Naming nit: I think it's better to be a bit more verbose rather than 
truncate, since it's more readable imo. Instead of `tops` could we call it 
`tableOperations` like we do in other places? Also maybe 
`testCommitFailedToAcquireLock` for the test name? 



##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -383,7 +386,10 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   }
   throw cfe;
 } finally {
-  lockManager.release(dst.toString(), src.toString());
+  boolean success = lockManager.release(dst.toString(), src.toString());
+  if (!success) {
+LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
+  }

Review Comment:
   Same nit as above, I think we should just inline the boolean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Fix lock acquisition logic in HadoopTableOperations rename [iceberg]

2024-01-17 Thread via GitHub


amogh-jahagirdar commented on PR #9498:
URL: https://github.com/apache/iceberg/pull/9498#issuecomment-1896295071

   looks like spotless checks are failing:
   
   if you could run
   
   ```
   ./gradlew spotlessApply
   ```
   
   before pushing your next changes that would fix it! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Cannot write nullable values to non-null column in the Iceberg Table [iceberg]

2024-01-17 Thread via GitHub


abharath9 commented on issue #9488:
URL: https://github.com/apache/iceberg/issues/9488#issuecomment-1896331390

   @nastra Yes i am aware of that. How do i write optional fields data to the 
mandatory fields data. It is mentioned in this issue that it is possible by 
setting "spark.sql.iceberg.check-nullability", 'false' in spark config but it 
is not working. I wonder if there's anything that i am missing or any other 
config to add.
   https://github.com/apache/iceberg/pull/514
   
   Just FYI, the above code is working fine if i upgrade to spark 3.5 and 
iceberg 1.4.0. I want to get this working with Spark 3.2.0 and Iceberg 0.13.0. 
   
   Thanks and appreciate any help


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Two-level parquet read EOF error: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current page. repetition

2024-01-17 Thread via GitHub


gaoshihang commented on issue #9497:
URL: https://github.com/apache/iceberg/issues/9497#issuecomment-1896362482

   And here is the iceberg schema
   
[v8.metadata.json](https://github.com/apache/iceberg/files/13967089/v8.metadata.json)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Two-level parquet read EOF error: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current page. repetition

2024-01-17 Thread via GitHub


gaoshihang commented on issue #9497:
URL: https://github.com/apache/iceberg/issues/9497#issuecomment-1896369837

   And here is the parquet file we used to add_files.
   (need to change the .log to .parquet)
   
[user_error_parquet.log](https://github.com/apache/iceberg/files/13967114/user_error_parquet.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Fix lock acquisition logic in HadoopTableOperations rename [iceberg]

2024-01-17 Thread via GitHub


N-o-Z commented on PR #9498:
URL: https://github.com/apache/iceberg/pull/9498#issuecomment-1896383730

   @amogh-jahagirdar  Done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] API, Core, Spark: Add fastForwardOrCreate API and integrate that with Spark fast forward procedure [iceberg]

2024-01-17 Thread via GitHub


rdblue commented on PR #9196:
URL: https://github.com/apache/iceberg/pull/9196#issuecomment-1896389067

   @amogh-jahagirdar, I think I would prefer the second alternative, to change 
the behavior of fast-forward. I doubt that anyone relies on fast-forward _not_ 
creating a branch and failing instead. This would be avoiding an error 
condition that is likely surprising behavior to most users.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456300895


##
mkdocs/docs/api.md:
##
@@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(
 
 The static-table is considered read-only.
 
+## Write support
+
+With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an 
Arrow Table:
+
+```python
+import pyarrow as pa
+
+df = pa.Table.from_pylist(
+[
+{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
+{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
+{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
+{"city": "Paris", "lat": 48.864716, "long": 2.349014},
+],
+)
+```
+
+Next, create a table based on the schema:
+
+```python
+from pyiceberg.catalog import load_catalog
+
+catalog = load_catalog("default")
+
+from pyiceberg.schema import Schema
+from pyiceberg.types import NestedField, StringType, DoubleType
+
+schema = Schema(
+NestedField(1, "city", StringType(), required=False),
+NestedField(2, "lat", DoubleType(), required=False),
+NestedField(3, "long", DoubleType(), required=False),

Review Comment:
   Isn't `required=False` the default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


Fokko commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456327592


##
mkdocs/docs/api.md:
##
@@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(
 
 The static-table is considered read-only.
 
+## Write support
+
+With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an 
Arrow Table:
+
+```python
+import pyarrow as pa
+
+df = pa.Table.from_pylist(
+[
+{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
+{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
+{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
+{"city": "Paris", "lat": 48.864716, "long": 2.349014},
+],
+)
+```
+
+Next, create a table based on the schema:
+
+```python
+from pyiceberg.catalog import load_catalog
+
+catalog = load_catalog("default")
+
+from pyiceberg.schema import Schema
+from pyiceberg.types import NestedField, StringType, DoubleType
+
+schema = Schema(
+NestedField(1, "city", StringType(), required=False),
+NestedField(2, "lat", DoubleType(), required=False),
+NestedField(3, "long", DoubleType(), required=False),

Review Comment:
   No, the default is the more strict `True`. I've set it to False because 
PyArrow produces nullable fields by default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Purge support for Iceberg view [iceberg]

2024-01-17 Thread via GitHub


nk1506 commented on issue #9433:
URL: https://github.com/apache/iceberg/issues/9433#issuecomment-1896529369

   With purge enablement similar like 
[dropTable](https://github.com/apache/iceberg/blob/66b1aa662761606d4d68d99371c62505e7ac2f1e/api/src/main/java/org/apache/iceberg/catalog/Catalog.java#L307)
 there should be a way to delete View Metadata files. 
   In case of multiple version of views it can have multiple metadata files. 
Adding purge support will help to delete all the related metadata files. 
   
   WDYT ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Apply Name mapping, new_schema_for_table [iceberg-python]

2024-01-17 Thread via GitHub


syun64 commented on code in PR #219:
URL: https://github.com/apache/iceberg-python/pull/219#discussion_r1456437583


##
pyiceberg/io/pyarrow.py:
##
@@ -733,42 +854,178 @@ def _get_field_id(field: pa.Field) -> Optional[int]:
 )
 
 
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
-def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: 
List[Optional[IcebergType]]) -> List[NestedField]:
-fields = []
-for i, field in enumerate(arrow_fields):
-field_id = _get_field_id(field)
-field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
-field_type = field_results[i]
-if field_type is not None and field_id is not None:
-fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc))
-return fields
-
-def schema(self, schema: pa.Schema, field_results: 
List[Optional[IcebergType]]) -> Schema:
-return Schema(*self._convert_fields(schema, field_results))
-
-def struct(self, struct: pa.StructType, field_results: 
List[Optional[IcebergType]]) -> IcebergType:
-return StructType(*self._convert_fields(struct, field_results))
-
-def list(self, list_type: pa.ListType, element_result: 
Optional[IcebergType]) -> Optional[IcebergType]:
+class _HasIds(PyArrowSchemaVisitor[bool]):
+def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
+return struct_result
+
+def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
+return all(field_results)
+
+def field(self, field: pa.Field, field_result: bool) -> bool:
+return all([_get_field_id(field) is not None, field_result])
+
+def list(self, list_type: pa.ListType, element_result: bool) -> bool:
 element_field = list_type.value_field
 element_id = _get_field_id(element_field)
-if element_result is not None and element_id is not None:
-return ListType(element_id, element_result, element_required=not 
element_field.nullable)
-return None
+return element_result and element_id is not None
 
-def map(
-self, map_type: pa.MapType, key_result: Optional[IcebergType], 
value_result: Optional[IcebergType]
-) -> Optional[IcebergType]:
+def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) 
-> bool:
 key_field = map_type.key_field
 key_id = _get_field_id(key_field)
 value_field = map_type.item_field
 value_id = _get_field_id(value_field)
-if key_result is not None and value_result is not None and key_id is 
not None and value_id is not None:
-return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
-return None
+return all([key_id is not None, value_id is not None, key_result, 
value_result])
+
+def primitive(self, primitive: pa.DataType) -> bool:
+return True
+
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+"""Converts PyArrowSchema to Iceberg Schema. Applies the IDs from 
name_mapping if provided."""
+
+_field_names: List[str]
+_name_mapping: Optional[NameMapping]
+
+def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
+self._field_names = []
+self._name_mapping = name_mapping
+
+def _current_path(self) -> str:
+return ".".join(self._field_names)
+
+def _field_id(self, field: pa.Field) -> int:
+if self._name_mapping:
+return self._name_mapping.find(self._current_path()).field_id
+elif (field_id := _get_field_id(field)) is not None:
+return field_id
+else:
+raise ValueError(f"Cannot convert {field} to Iceberg Field as 
field_id is empty.")
+
+def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
+return Schema(*struct_result.fields)
+
+def struct(self, struct: pa.StructType, field_results: List[NestedField]) 
-> StructType:
+return StructType(*field_results)
+
+def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
+field_id = self._field_id(field)
+field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
+field_type = field_result
+return NestedField(field_id, field.name, field_type, required=not 
field.nullable, doc=field_doc)
+
+def list(self, list_type: pa.ListType, element_result: IcebergType) -> 
ListType:
+element_field = list_type.value_field
+self._field_names.append(LIST_ELEMENT_NAME)
+element_id = self._field_id(element_field)
+self._field_names.pop()
+return ListType(element_id, element_result, element_required=not 
element_field.nullable)
 
-def primitive

Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456467981


##
pyiceberg/table/__init__.py:
##
@@ -856,6 +909,61 @@ def history(self) -> List[SnapshotLogEntry]:
 def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
 return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+def append(self, df: pa.Table) -> None:
+"""
+Append data to the table.
+
+Args:
+df: The Arrow dataframe that will be appended to overwrite the 
table
+"""
+if len(self.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(self.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+snapshot_id = self.new_snapshot_id()

Review Comment:
   Minor: this can be handled inside of `_MergeAppend` since it has the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456469457


##
pyiceberg/table/__init__.py:
##
@@ -856,6 +909,61 @@ def history(self) -> List[SnapshotLogEntry]:
 def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
 return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+def append(self, df: pa.Table) -> None:
+"""
+Append data to the table.
+
+Args:
+df: The Arrow dataframe that will be appended to overwrite the 
table
+"""
+if len(self.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(self.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+snapshot_id = self.new_snapshot_id()
+
+data_files = _dataframe_to_data_files(self, df=df)
+merge = _MergeAppend(operation=Operation.APPEND, table=self, 
snapshot_id=snapshot_id)

Review Comment:
   Is this really a "merge append" if the operation may be overwrite? You might 
consider using `_MergingCommit` or `_MergingSnapshotProducer` (if you want to 
follow the Java convention).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456478299


##
pyiceberg/table/__init__.py:
##
@@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]:
 def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
 return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+def append(self, df: pa.Table) -> None:
+if len(self.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+snapshot_id = self.new_snapshot_id()
+
+data_files = _dataframe_to_data_files(self, df=df)
+merge = _MergeAppend(operation=Operation.APPEND, table=self, 
snapshot_id=snapshot_id)
+for data_file in data_files:
+merge.append_datafile(data_file)
+
+if current_snapshot := self.current_snapshot():
+for manifest in current_snapshot.manifests(io=self.io):
+for entry in manifest.fetch_manifest_entry(io=self.io):
+merge.append_datafile(entry.data_file, added=False)
+
+merge.commit()
+
+def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = 
ALWAYS_TRUE) -> None:

Review Comment:
   :sweat_smile: You're right, that was more of a statement than a question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] [HadoopCatalog]: [HadoopTableOperations]: Commit flow, renameToFinal does not actually check if lock acquired [iceberg]

2024-01-17 Thread via GitHub


amogh-jahagirdar closed issue #9485: [HadoopCatalog]: [HadoopTableOperations]: 
Commit flow, renameToFinal does not actually check if lock acquired
URL: https://github.com/apache/iceberg/issues/9485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: Fix lock acquisition logic in HadoopTableOperations rename [iceberg]

2024-01-17 Thread via GitHub


amogh-jahagirdar merged PR #9498:
URL: https://github.com/apache/iceberg/pull/9498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456486632


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:

Review Comment:
   Should this return an iterator instead of a list? Looks like 
`fetch_manifest_entry` returns a list, but we may not need so much copying. Not 
sure if that's a concern in Python.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456487418


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:

Review Comment:
   Wait, I see that this is run in the executor. It's probably creating a list 
so that the construction happens in parallel. Nevermind!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456489014


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:
+return [
+ManifestEntry(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=entry.snapshot_id,
+data_sequence_number=entry.data_sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in 
manifest.fetch_manifest_entry(self._table.io, discard_deleted=True)
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._table.io))

Review Comment:
   It may be a good idea to defensively use only data manifests here instead of 
all manifests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issue

Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456491428


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:
+return [
+ManifestEntry(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=entry.snapshot_id,
+data_sequence_number=entry.data_sequence_number,
+file_sequence_number=entry.file_sequence_number,

Review Comment:
   Looks good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456493214


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:
+return [
+ManifestEntry(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=entry.snapshot_id,
+data_sequence_number=entry.data_sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in 
manifest.fetch_manifest_entry(self._table.io, discard_deleted=True)
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._table.io))
+return list(chain(*list_of_entries))
+return []
+elif self._operation == Operation.APPEND:
+return []
+else:
+raise ValueError(f"Not implemented for: {self._operation}")
+
+def _manifests(self) -> List[ManifestFile]:
+manifests = []

Review Comment:
   Minor: Since this is empty, it looks like this is just the newly created 
manifests. It may be a good idea to name this `new_manifests`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL abov

Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456495179


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:
+return [
+ManifestEntry(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=entry.snapshot_id,
+data_sequence_number=entry.data_sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in 
manifest.fetch_manifest_entry(self._table.io, discard_deleted=True)
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._table.io))
+return list(chain(*list_of_entries))
+return []
+elif self._operation == Operation.APPEND:
+return []
+else:
+raise ValueError(f"Not implemented for: {self._operation}")
+
+def _manifests(self) -> List[ManifestFile]:
+manifests = []
+deleted_entries = self._deleted_entries()
+
+if self._added_data_files:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._tabl

Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456498171


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:
+return [
+ManifestEntry(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=entry.snapshot_id,
+data_sequence_number=entry.data_sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in 
manifest.fetch_manifest_entry(self._table.io, discard_deleted=True)
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._table.io))
+return list(chain(*list_of_entries))
+return []
+elif self._operation == Operation.APPEND:
+return []
+else:
+raise ValueError(f"Not implemented for: {self._operation}")
+
+def _manifests(self) -> List[ManifestFile]:
+manifests = []
+deleted_entries = self._deleted_entries()
+
+if self._added_data_files:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._tabl

[PR] Add 1.4.3 docs [iceberg]

2024-01-17 Thread via GitHub


bitsondatadev opened a new pull request, #9499:
URL: https://github.com/apache/iceberg/pull/9499

   Add 1.4.3 docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456502771


##
pyiceberg/table/__init__.py:
##
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_data_file_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+if len(table.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+if len(table.sort_order().fields) > 0:
+raise ValueError("Cannot write to tables with a sort-order")
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_data_files: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_data_files = []
+self._commit_uuid = uuid.uuid4()
+
+def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+self._added_data_files.append(data_file)
+return self
+
+def _deleted_entries(self) -> List[ManifestEntry]:
+"""To determine if we need to record any deleted entries.
+
+With partial overwrites we have to use the predicate to evaluate
+which entries are affected.
+"""
+if self._operation == Operation.OVERWRITE:
+if self._parent_snapshot_id is not None:
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+if previous_snapshot is None:
+# This should never happen since you cannot overwrite an 
empty table
+raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+executor = ExecutorFactory.get_or_create()
+
+def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:
+return [
+ManifestEntry(
+status=ManifestEntryStatus.DELETED,
+snapshot_id=entry.snapshot_id,
+data_sequence_number=entry.data_sequence_number,
+file_sequence_number=entry.file_sequence_number,
+data_file=entry.data_file,
+)
+for entry in 
manifest.fetch_manifest_entry(self._table.io, discard_deleted=True)
+]
+
+list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._table.io))
+return list(chain(*list_of_entries))
+return []
+elif self._operation == Operation.APPEND:
+return []
+else:
+raise ValueError(f"Not implemented for: {self._operation}")
+
+def _manifests(self) -> List[ManifestFile]:
+manifests = []
+deleted_entries = self._deleted_entries()
+
+if self._added_data_files:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._tabl

Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456523045


##
mkdocs/docs/api.md:
##
@@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(
 
 The static-table is considered read-only.
 
+## Write support
+
+With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an 
Arrow Table:

Review Comment:
   Thanks for this example! Made it really easy to test out.
   
   The example works great cut & pasted into a REPL. I also tested 
modifications to the dataframe schema passed to append and it does the right 
thing. I get a schema error for a few cases:
   * Missing column `long`
   * Type mismatch `string` instead of `double`
   * Extra column `country`
   
   Looks like Arrow requires that the schema matches, which is great.
   
   It would be nice to allow some type promotion in the future. I'm not sure 
whether arrow would automatically write floats into double columns, for 
example. I would also like to make sure we have better error messages, not just 
"ValueError: Table schema does not match schema used to create file: ...". 
Those will be good follow ups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-17 Thread via GitHub


rdblue commented on PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#issuecomment-1896944638

   @Fokko, this works great and I don't see any blockers so I've approved it.
   
   I think there are a few things to consider in terms of how we want to do 
this moving forward (whether to use separate manifests for example) but we can 
get this in and iterate from there. It also looks like this is pretty close to 
being able to run the overwrite filter, too! Great work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add 1.4.3 docs [iceberg]

2024-01-17 Thread via GitHub


dramaticlly commented on code in PR #9499:
URL: https://github.com/apache/iceberg/pull/9499#discussion_r1456526671


##
1.4.3/mkdocs.yml:
##
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+site_name: docs/1.4.2

Review Comment:
   should this be 1.4.3 instead?



##
1.4.3/mkdocs.yml:
##
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+site_name: docs/1.4.2
+
+plugins:
+  - search
+
+nav:
+  - index.md
+  - Tables:
+- branching.md
+- configuration.md
+- evolution.md
+- maintenance.md
+- partitioning.md
+- performance.md
+- reliability.md
+- schemas.md
+  - Spark:
+- spark-getting-started.md
+- spark-configuration.md
+- spark-ddl.md
+- spark-procedures.md
+- spark-queries.md
+- spark-structured-streaming.md
+- spark-writes.md
+  - Flink:
+- flink.md
+- flink-connector.md
+- flink-ddl.md
+- flink-queries.md
+- flink-writes.md
+- flink-actions.md
+- flink-configuration.md
+  - hive.md
+  - Trino: https://trino.io/docs/current/connector/iceberg.html
+  - Clickhouse: 
https://clickhouse.com/docs/en/engines/table-engines/integrations/iceberg
+  - Presto: https://prestodb.io/docs/current/connector/iceberg.html
+  - Dremio: https://docs.dremio.com/data-formats/apache-iceberg/
+  - Starrocks: 
https://docs.starrocks.io/en-us/latest/data_source/catalog/iceberg_catalog
+  - Amazon Athena: 
https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html
+  - Amazon EMR: 
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-cluster.html
+  - Impala: 
https://impala.apache.org/docs/build/html/topics/impala_iceberg.html
+  - Doris: https://doris.apache.org/docs/dev/lakehouse/multi-catalog/iceberg
+  - Integrations:
+- aws.md
+- dell.md
+- jdbc.md
+- nessie.md
+  - API:
+- java-api-quickstart.md
+- api.md
+- custom-catalog.md
+  - Javadoc: ../../javadoc/1.4.2

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]

2024-01-17 Thread via GitHub


stevenzwu commented on code in PR #9321:
URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456527386


##
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * Internal partitioner implementation that supports MapDataStatistics, which 
is typically used for
+ * low-cardinality use cases. While MapDataStatistics can keep accurate 
counters, it can't be used
+ * for high-cardinality use cases. Otherwise, the memory footprint is too high.
+ */
+class MapRangePartitioner implements Partitioner {
+  private final RowDataWrapper rowDataWrapper;
+  private final SortKey sortKey;
+  private final Comparator comparator;
+  private final Map mapStatistics;
+  private final double closeFileCostInWeightPercentage;
+
+  // lazily computed due to the need of numPartitions
+  private Map assignment;
+  private NavigableMap sortedStatsWithCloseFileCost;
+
+  MapRangePartitioner(
+  Schema schema,
+  SortOrder sortOrder,
+  MapDataStatistics dataStatistics,
+  double closeFileCostInWeightPercentage) {
+this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
+this.sortKey = new SortKey(schema, sortOrder);
+this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+this.mapStatistics = dataStatistics.statistics();
+this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
+  }
+
+  @Override
+  public int partition(RowData row, int numPartitions) {
+// assignment table can only be built lazily when first referenced here,
+// because number of partitions (downstream subtasks) is needed
+Map assignmentMap = assignment(numPartitions);
+// reuse the sortKey and rowDataWrapper
+sortKey.wrap(rowDataWrapper.wrap(row));
+KeyAssignment keyAssignment = assignmentMap.get(sortKey);
+if (keyAssignment == null) {
+  // haven't learned about the key before. fall back to random selection.
+  return ThreadLocalRandom.current().nextInt(numPartitions);

Review Comment:
   I will go with round robin `AtomicLong.getAndIncrement() % N` then. it might 
be a tiny bit faster than `ThreadLocalRandom.current.nextInt(N)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]

2024-01-17 Thread via GitHub


stevenzwu commented on code in PR #9321:
URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456527386


##
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * Internal partitioner implementation that supports MapDataStatistics, which 
is typically used for
+ * low-cardinality use cases. While MapDataStatistics can keep accurate 
counters, it can't be used
+ * for high-cardinality use cases. Otherwise, the memory footprint is too high.
+ */
+class MapRangePartitioner implements Partitioner {
+  private final RowDataWrapper rowDataWrapper;
+  private final SortKey sortKey;
+  private final Comparator comparator;
+  private final Map mapStatistics;
+  private final double closeFileCostInWeightPercentage;
+
+  // lazily computed due to the need of numPartitions
+  private Map assignment;
+  private NavigableMap sortedStatsWithCloseFileCost;
+
+  MapRangePartitioner(
+  Schema schema,
+  SortOrder sortOrder,
+  MapDataStatistics dataStatistics,
+  double closeFileCostInWeightPercentage) {
+this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
+this.sortKey = new SortKey(schema, sortOrder);
+this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+this.mapStatistics = dataStatistics.statistics();
+this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
+  }
+
+  @Override
+  public int partition(RowData row, int numPartitions) {
+// assignment table can only be built lazily when first referenced here,
+// because number of partitions (downstream subtasks) is needed
+Map assignmentMap = assignment(numPartitions);
+// reuse the sortKey and rowDataWrapper
+sortKey.wrap(rowDataWrapper.wrap(row));
+KeyAssignment keyAssignment = assignmentMap.get(sortKey);
+if (keyAssignment == null) {
+  // haven't learned about the key before. fall back to random selection.
+  return ThreadLocalRandom.current().nextInt(numPartitions);

Review Comment:
   I will go with round robin `AtomicLong.getAndIncrement() % N` then. it might 
be a tiny bit faster than `ThreadLocalRandom.current.nextInt(N)`.
   
   Ideally, we want to increment a counter metric in this scenario. I looked in 
the partitioner interface and how it is used. I couldn't find a way of passing 
in the `MetricGroup` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]

2024-01-17 Thread via GitHub


stevenzwu commented on code in PR #9321:
URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456529072


##
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * Internal partitioner implementation that supports MapDataStatistics, which 
is typically used for
+ * low-cardinality use cases. While MapDataStatistics can keep accurate 
counters, it can't be used
+ * for high-cardinality use cases. Otherwise, the memory footprint is too high.
+ */
+class MapRangePartitioner implements Partitioner {
+  private final RowDataWrapper rowDataWrapper;
+  private final SortKey sortKey;
+  private final Comparator comparator;
+  private final Map mapStatistics;
+  private final double closeFileCostInWeightPercentage;
+
+  // lazily computed due to the need of numPartitions
+  private Map assignment;
+  private NavigableMap sortedStatsWithCloseFileCost;
+
+  MapRangePartitioner(
+  Schema schema,
+  SortOrder sortOrder,
+  MapDataStatistics dataStatistics,
+  double closeFileCostInWeightPercentage) {
+this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
+this.sortKey = new SortKey(schema, sortOrder);
+this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+this.mapStatistics = dataStatistics.statistics();
+this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
+  }
+
+  @Override
+  public int partition(RowData row, int numPartitions) {
+// assignment table can only be built lazily when first referenced here,
+// because number of partitions (downstream subtasks) is needed
+Map assignmentMap = assignment(numPartitions);
+// reuse the sortKey and rowDataWrapper
+sortKey.wrap(rowDataWrapper.wrap(row));
+KeyAssignment keyAssignment = assignmentMap.get(sortKey);
+if (keyAssignment == null) {
+  // haven't learned about the key before. fall back to random selection.
+  return ThreadLocalRandom.current().nextInt(numPartitions);
+}
+
+return keyAssignment.select();
+  }
+
+  @VisibleForTesting
+  Map assignment(int numPartitions) {
+if (assignment == null) {
+  long totalWeight = mapStatistics.values().stream().mapToLong(l -> 
l).sum();
+  double targetWeightPerSubtask = ((double) totalWeight) / numPartitions;
+  long closeFileCostInWeight =
+  (long) Math.ceil(targetWeightPerSubtask * 
closeFileCostInWeightPercentage / 100);
+
+  // add one close file cost for each key even if a key with large weight 
may be assigned to
+  // multiple subtasks
+  this.sortedStatsWithCloseFileCost = Maps.newTreeMap(comparator);
+  mapStatistics.forEach(
+  (k, v) -> {
+int estimatedSplits = (int) Math.ceil(v / targetWeightPerSubtask);
+long estimatedCloseFileCost = closeFileCostInWeight * 
estimatedSplits;
+sortedStatsWithCloseFileCost.put(k, v + estimatedCloseFileCost);
+  });
+
+  long totalWe

[PR] Fix community link [iceberg]

2024-01-17 Thread via GitHub


bitsondatadev opened a new pull request, #9500:
URL: https://github.com/apache/iceberg/pull/9500

   The community link works only on top-level site links. Link this to the 
static site for now, eventually we need to consider a site-wide variable 
solution but that's not important for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add 1.4.3 docs [iceberg]

2024-01-17 Thread via GitHub


bitsondatadev commented on code in PR #9499:
URL: https://github.com/apache/iceberg/pull/9499#discussion_r1456528989


##
1.4.3/mkdocs.yml:
##
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+site_name: docs/1.4.2
+
+plugins:
+  - search
+
+nav:
+  - index.md
+  - Tables:
+- branching.md
+- configuration.md
+- evolution.md
+- maintenance.md
+- partitioning.md
+- performance.md
+- reliability.md
+- schemas.md
+  - Spark:
+- spark-getting-started.md
+- spark-configuration.md
+- spark-ddl.md
+- spark-procedures.md
+- spark-queries.md
+- spark-structured-streaming.md
+- spark-writes.md
+  - Flink:
+- flink.md
+- flink-connector.md
+- flink-ddl.md
+- flink-queries.md
+- flink-writes.md
+- flink-actions.md
+- flink-configuration.md
+  - hive.md
+  - Trino: https://trino.io/docs/current/connector/iceberg.html
+  - Clickhouse: 
https://clickhouse.com/docs/en/engines/table-engines/integrations/iceberg
+  - Presto: https://prestodb.io/docs/current/connector/iceberg.html
+  - Dremio: https://docs.dremio.com/data-formats/apache-iceberg/
+  - Starrocks: 
https://docs.starrocks.io/en-us/latest/data_source/catalog/iceberg_catalog
+  - Amazon Athena: 
https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html
+  - Amazon EMR: 
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-cluster.html
+  - Impala: 
https://impala.apache.org/docs/build/html/topics/impala_iceberg.html
+  - Doris: https://doris.apache.org/docs/dev/lakehouse/multi-catalog/iceberg
+  - Integrations:
+- aws.md
+- dell.md
+- jdbc.md
+- nessie.md
+  - API:
+- java-api-quickstart.md
+- api.md
+- custom-catalog.md
+  - Javadoc: ../../javadoc/1.4.2

Review Comment:
   ditto to above reply.



##
1.4.3/mkdocs.yml:
##
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+site_name: docs/1.4.2

Review Comment:
   yes, I still need to wrap up this automation for the actual docs deployment 
sequence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[PR] [Bug Fix] TruncateTransform for falsey values [iceberg-python]

2024-01-17 Thread via GitHub


syun64 opened a new pull request, #276:
URL: https://github.com/apache/iceberg-python/pull/276

   Currently, any falsey values will return None for their 
**TruncateTransform**. This results in **fill_parquet_file_metadata** throwing 
an exception whenever there is a falsey lower bound as the minimum value for 
the column statistic, as None cannot be encoded into "UTF-8" and fails for the 
function **to_bytes**.
   
   The falsey values added in the test below are valid minimum column statistic 
values, and hence we should adjust the function to correctly transform and 
support them.
   
   This fix is going to be crucial as we prepare to introduce Write Support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1456546379


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.CommandExecutionMode
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  originalText: String,
+  query: LogicalPlan,
+  userSpecifiedColumns: Seq[(String, Option[String])],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
+qe.assertAnalyzed()
+val analyzedPlan = qe.analyzed
+
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+
+if (userSpecifiedColumns.nonEmpty) {
+  if (userSpecifiedColumns.length > analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  } else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  }
+}
+
+val queryColumnNames = analyzedPlan.schema.fieldNames
+SchemaUtils.checkColumnNameDuplication(queryColumnNames, 
SQLConf.get.resolver)

Review Comment:
   If there are aliases, we could succeed though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Added error handling and default logic for Flink version detection [iceberg]

2024-01-17 Thread via GitHub


gjacoby126 commented on code in PR #9452:
URL: https://github.com/apache/iceberg/pull/9452#discussion_r1456547407


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java:
##
@@ -19,15 +19,31 @@
 package org.apache.iceberg.flink.util;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 
 public class FlinkPackage {
-  /** Choose {@link DataStream} class because it is one of the core Flink API. 
*/
-  private static final String VERSION = 
DataStream.class.getPackage().getImplementationVersion();
+
+  public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN";
 
   private FlinkPackage() {}
 
   /** Returns Flink version string like x.y.z */
   public static String version() {
-return VERSION;
+try {
+  String version = getVersionFromJar();
+  /* If we can't detect the exact implementation version from the jar 
(this can happen if the DataStream class
+   appears multiple times in the same classpath such as with shading), 
then the best we can do is say it's
+   unknown
+  */
+  return version != null ? version : FLINK_UNKNOWN_VERSION;

Review Comment:
   Uploaded a 1-method version using `AtomicReference` similar to what @nastra 
and @pvary were suggesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]

2024-01-17 Thread via GitHub


stevenzwu commented on code in PR #9321:
URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456527386


##
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * Internal partitioner implementation that supports MapDataStatistics, which 
is typically used for
+ * low-cardinality use cases. While MapDataStatistics can keep accurate 
counters, it can't be used
+ * for high-cardinality use cases. Otherwise, the memory footprint is too high.
+ */
+class MapRangePartitioner implements Partitioner {
+  private final RowDataWrapper rowDataWrapper;
+  private final SortKey sortKey;
+  private final Comparator comparator;
+  private final Map mapStatistics;
+  private final double closeFileCostInWeightPercentage;
+
+  // lazily computed due to the need of numPartitions
+  private Map assignment;
+  private NavigableMap sortedStatsWithCloseFileCost;
+
+  MapRangePartitioner(
+  Schema schema,
+  SortOrder sortOrder,
+  MapDataStatistics dataStatistics,
+  double closeFileCostInWeightPercentage) {
+this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
+this.sortKey = new SortKey(schema, sortOrder);
+this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+this.mapStatistics = dataStatistics.statistics();
+this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
+  }
+
+  @Override
+  public int partition(RowData row, int numPartitions) {
+// assignment table can only be built lazily when first referenced here,
+// because number of partitions (downstream subtasks) is needed
+Map assignmentMap = assignment(numPartitions);
+// reuse the sortKey and rowDataWrapper
+sortKey.wrap(rowDataWrapper.wrap(row));
+KeyAssignment keyAssignment = assignmentMap.get(sortKey);
+if (keyAssignment == null) {
+  // haven't learned about the key before. fall back to random selection.
+  return ThreadLocalRandom.current().nextInt(numPartitions);

Review Comment:
   I will go with round robin `counter % N` then. it might be a tiny bit faster 
than `ThreadLocalRandom.current.nextInt(N)`.
   
   Ideally, we want to increment a counter metric in this scenario. I looked in 
the partitioner interface and how it is used. I couldn't find a way of passing 
in the `MetricGroup` here. for now, we can probably log sth



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Support creating views via SQL [iceberg]

2024-01-17 Thread via GitHub


rdblue commented on code in PR #9423:
URL: https://github.com/apache/iceberg/pull/9423#discussion_r1456586630


##
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.CommandExecutionMode
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  originalText: String,
+  query: LogicalPlan,
+  userSpecifiedColumns: Seq[(String, Option[String])],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+val analyzedPlan = session.sessionState.executePlan(query, 
CommandExecutionMode.SKIP).analyzed
+val identifier = Spark3Util.toV1TableIdentifier(ident)
+
+if (userSpecifiedColumns.nonEmpty) {
+  if (userSpecifiedColumns.length > analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  } else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
+throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+  identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
+  }
+}
+
+val queryColumnNames = analyzedPlan.schema.fieldNames
+SchemaUtils.checkColumnNameDuplication(queryColumnNames, 
SQLConf.get.resolver)
+
+val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema
+val columnAliases = userSpecifiedColumns.map(_._1).toArray
+val columnComments = userSpecifiedColumns.map(_._2.orNull).toArray
+
+val currentCatalogName = 
session.sessionState.catalogManager.currentCatalog.name
+val currentCatalog = if (!catalog.name().equals(currentCatalogName)) 
currentCatalogName else null
+val currentNamespace = session.sessionState.catalogManager.currentNamespace
+
+val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
+val newProperties = properties ++
+  comment.map(ViewCatalog.PROP_COMMENT -> _) +
+  (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
+ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)
+
+if (replace) {
+  // CREATE OR REPLACE VIEW
+  if (catalog.viewExists(ident)) {
+catalog.dropView(ident)
+  }
+  // FIXME: replaceView API doesn't exist in Spark 3.5
+  catalog.createView(
+ident,
+originalText,
+currentCatalog,
+currentNamespace,
+viewSchema,
+queryColumnNames,
+columnAliases,
+columnComments,
+newProperties.asJava)
+} else {
+  try {
+// CREATE VIEW [IF NOT EXISTS]
+catalog.createView(
+  ident,
+  originalText,
+  currentCatalog,
+  currentNamespace,
+  viewSchema,
+  queryColumnNames,
+  columnAliases,
+  columnComments,
+  newProperties.asJava)
+  } catch {
+case _: ViewAlreadyExistsException if allowExisting => // Ignore
+  }
+}
+
+Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+s"CreateV2ViewExec: ${ident}"
+  }
+
+  /**
+

Re: [PR] Fix community link [iceberg]

2024-01-17 Thread via GitHub


amogh-jahagirdar merged PR #9500:
URL: https://github.com/apache/iceberg/pull/9500


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[PR] [Reference PR] [API + Avro] Add default value APIs and Avro implementation [iceberg]

2024-01-17 Thread via GitHub


wmoustafa opened a new pull request, #9502:
URL: https://github.com/apache/iceberg/pull/9502

   This PR adds default value APIs according to the default value spec, and 
implements it in the `GenericAvroReader` case. It uses a `ConstantReader` to 
fill in the default values of fields from their respective `initialDefault()` 
method. This PR rebases the implementation in #6004 on top of #9366. However, 
this PR is just a reference PR for now since there is still an open question 
about the best place to convert from Iceberg's data model to the Avro data 
model. For now, I have an Avro-specific method in `ValueReaders` but that is 
not the correct place. I think a better place is in `GenericAvroReader` where 
an Avro-specific class extends `ValueReaders.ConstantReader`; however, 
`ValueReaders.ConstantReader` is currently private. Creating this PR to 
start a discussion about the best place for constant (in-memory) data 
conversion. I have marked such places (where a future change is required) as 
TODO items.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



  1   2   >