Re: [I] Caused by: java.net.SocketException: Connection reset [iceberg]
pvary commented on issue #9444: URL: https://github.com/apache/iceberg/issues/9444#issuecomment-1895290384 > Because it can also be an intermittent network issue for anyone using Iceberg with Flink and failing the entire stream for that sounds a bit harsh. For the record, the fix you are suggesting is in the Iceberg AWS code, not in the Flink Iceberg connector code 😄 If we do retries in Iceberg we use `Tasks` for retries, like: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java#L175 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add formatting for toml files [iceberg-rust]
liurenjie1024 commented on code in PR #167: URL: https://github.com/apache/iceberg-rust/pull/167#discussion_r1454897761 ## Makefile: ## @@ -32,7 +32,11 @@ cargo-sort: cargo install cargo-sort cargo sort -c -w -check: check-fmt check-clippy cargo-sort +fmt-toml: + cargo install taplo-cli --locked + taplo fmt Review Comment: > I'm guessing not. It will fmt the files directly if it used to be incorrectly formatted. Than I think it's better to use `check` here. The goal of ci is to check the correctness, not do formatting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Caused by: java.net.SocketException: Connection reset [iceberg]
javrasya commented on issue #9444: URL: https://github.com/apache/iceberg/issues/9444#issuecomment-1895303601 True, the same file Io can be used for Spark too, I forgot that :-) Exactly I saw that implementation for retries in the source code. Thanks for sharing it, it is a good reference. I just needed a very simple retry logic in place to see if it really helps. I think it should be implemented in that way instead. Who do you think can be pulled in the discussions here for that? I know you guys are looking on the Flink side of Iceberg mostly. Not really sure how the contributors are grouped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] init writer framework [iceberg-rust]
ZENOTME commented on PR #135: URL: https://github.com/apache/iceberg-rust/pull/135#issuecomment-1895304460 I feel that for this writer framework, we may need more discussion, so I can separate this framework as `IcebergWriter` and `FileWriter` parts. The `IcebergWriter` part is about how to organize our writers, it's the core and more complicated. The `FileWriter` part is an abstraction for kinds of file formats like `parquet`, and `orc`. It's more simple. And we can work on the `FileWriter` part first if it looks good. How do you think? @Xuanwo @Fokko -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files [iceberg]
pvary commented on PR #9464: URL: https://github.com/apache/iceberg/pull/9464#issuecomment-1895305741 > The split/FileScanTask should only contain ASCII. If I understand correctly, the `FileScanTask` json will contain the `Schema`. The `Schema` has a `doc` field for comments. Do we have restrictions defined for the `doc` field? When working on Hive, plenty of our Chinese users added comments using non-ASCII characters there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Hive: Refactor hive-table commit operation to be used for other operations like view [iceberg]
nk1506 commented on code in PR #9461: URL: https://github.com/apache/iceberg/pull/9461#discussion_r1454918307 ## core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java: ## @@ -309,65 +300,19 @@ protected enum CommitStatus { * @return Commit Status of Success, Failure or Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { -int maxAttempts = Review Comment: This needed to move to a common place so that other operations like View can also use it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] init writer framework [iceberg-rust]
Xuanwo commented on PR #135: URL: https://github.com/apache/iceberg-rust/pull/135#issuecomment-1895313954 > And we can work on the `FileWriter` part first if it looks good. How do you think? @Xuanwo @Fokko LGTM! It's good to merge things in small chunks and polish them during the real usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1454928120 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { +val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) +qe.assertAnalyzed() +val analyzedPlan = qe.analyzed + +val identifier = Spark3Util.toV1TableIdentifier(ident) + +if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } +} + +val queryColumnNames = analyzedPlan.schema.fieldNames +SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + +val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema +val columnAliases = userSpecifiedColumns.map(_._1).toArray +val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray Review Comment: I agree that this should be null. The issue here was that Spark was failing with a NPE when running `TestViews#createViewWithColumnAliases()` in an earlier version of this PR when that particular code was living in `ResolveViews`. I checked again and this doesn't fail anymore, because the logic moved into `CreateV2ViewExec`. I've updated this to `null` now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1454951834 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { +val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) +qe.assertAnalyzed() +val analyzedPlan = qe.analyzed + +val identifier = Spark3Util.toV1TableIdentifier(ident) + +if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } +} + +val queryColumnNames = analyzedPlan.schema.fieldNames +SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + +val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema +val columnAliases = userSpecifiedColumns.map(_._1).toArray +val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray + +val currentCatalog = session.sessionState.catalogManager.currentCatalog.name +val currentNamespace = session.sessionState.catalogManager.currentNamespace + +val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION +val createEngineVersion = Some(engineVersion) +val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) ++ + createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + Review Comment: no particular reason, I've updated this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] `write.parquet.compression-codec` being set even if file-format is not parquet [iceberg]
oneonestar opened a new issue, #9490: URL: https://github.com/apache/iceberg/issues/9490 ### Apache Iceberg version 1.4.2 (latest release) ### Query engine Trino ### Please describe the bug 🐞 In Trino 436 (Iceberg 1.4.3), `write.parquet.compression-codec` property is also being set even if the file-format is not parquet. (https://github.com/trinodb/trino/issues/20401) I think the problem could be related to https://github.com/apache/iceberg/pull/8593#issuecomment-1740507634 ``` trino> CREATE TABLE test.property_test (c1 integer) WITH (format = 'ORC'); CREATE TABLE trino> SELECT * FROM test."property_test$properties"; key | value -+--- write.format.default| ORC write.parquet.compression-codec | zstd (2 rows) trino> CREATE TABLE test.property_test (c1 integer) WITH (format = 'AVRO'); CREATE TABLE trino> SELECT * FROM test."property_test$properties"; key | value -+--- write.format.default| AVRO write.parquet.compression-codec | zstd (2 rows) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1454958750 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { +val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) +qe.assertAnalyzed() Review Comment: this was taken from an early impl and is probably not needed anymore. I've removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Caused by: java.net.SocketException: Connection reset [iceberg]
pvary commented on issue #9444: URL: https://github.com/apache/iceberg/issues/9444#issuecomment-1895346429 Maybe @jackye1995? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] init writer framework [iceberg-rust]
liurenjie1024 commented on PR #135: URL: https://github.com/apache/iceberg-rust/pull/135#issuecomment-1895352135 > And we can work on the FileWriter part first if it looks good. How do you think? @Xuanwo @Fokko +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Hive: Unwrap RuntimeException for Hive TException with rename table [iceberg]
pvary commented on code in PR #9432: URL: https://github.com/apache/iceberg/pull/9432#discussion_r1454984558 ## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java: ## @@ -251,12 +251,14 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) { } catch (NoSuchObjectException e) { throw new NoSuchTableException("Table does not exist: %s", from); -} catch (AlreadyExistsException e) { - throw new org.apache.iceberg.exceptions.AlreadyExistsException( - "Table already exists: %s", to); - } catch (TException e) { - throw new RuntimeException("Failed to rename " + from + " to " + to, e); + if (e.getMessage() != null + && e.getMessage().contains(String.format("new table %s already exists", to))) { +throw new org.apache.iceberg.exceptions.AlreadyExistsException( +"Table already exists: %s", to); + } else { +throw new RuntimeException("Failed to rename " + from + " to " + to, e); + } Review Comment: could we please move this under a `catch (InvalidOperationException)`? But keep the old `TException` handler part -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] `write.parquet.compression-codec` being set even if file-format is not parquet [iceberg]
findinpath commented on issue #9490: URL: https://github.com/apache/iceberg/issues/9490#issuecomment-1895363913 cc @aokolnychyi pls see `org.apache.iceberg.TableMetadata#persistedProperties` in 2e291c2b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455012760 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { +val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) +qe.assertAnalyzed() +val analyzedPlan = qe.analyzed + +val identifier = Spark3Util.toV1TableIdentifier(ident) + +if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } +} + +val queryColumnNames = analyzedPlan.schema.fieldNames +SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) Review Comment: I think we want to perform this check, because Spark does the same and fails if you provide duplicate columns in the underlying query: ``` spark-sql (default)> create temporary view tempv2 as select id, id from iceberg1.foo WHERE id < 12; [COLUMN_ALREADY_EXISTS] The column `id` already exists. Consider to choose another name or rename the existing column. ``` I've left that check and also added a test for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455041489 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.execution.command.ViewHelper + +object ViewCheck extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { +plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => +val identifier = Spark3Util.toV1TableIdentifier(ident) Review Comment: we particularly test for this case via `createViewReferencingTempView()` and `createViewReferencingGlobalTempView()` and it seems to work. I've also checked and `CheckViews` runs right after the logic in `ResolveViews`. I think we could also just remove `CheckViews` and perform the check in `ResolveViews` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455090708 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.execution.command.ViewHelper + +object ViewCheck extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { +plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => +val identifier = Spark3Util.toV1TableIdentifier(ident) +ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty) +ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, identifier) Review Comment: initially I added this because the same check is being done in Spark's view creation code in https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala#L115. Looking at https://github.com/apache/spark/commit/3c683434fa3f041000af363fdc6bdaddf4e1fb2a where this feature was introduced, I actually don't think we need this check, so I went ahead and removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455100866 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.execution.command.ViewHelper + +object ViewCheck extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { +plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => +val identifier = Spark3Util.toV1TableIdentifier(ident) +ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty) Review Comment: what the check does is to make sure that a view doesn't reference a temporary/global view. We particularly test for these 2 cases in `createViewReferencingGlobalTempView()` / `createViewReferencingTempView()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Check stale issues in ascending order [iceberg]
ajantha-bhat commented on code in PR #9489: URL: https://github.com/apache/iceberg/pull/9489#discussion_r1455140931 ## .github/workflows/stale.yml: ## @@ -47,3 +47,4 @@ jobs: close-issue-message: > This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' + ascending: true Review Comment: the workflow triggers daily at midnight, considering 30 issues a day. It should have closed by now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Hive: Unwrap RuntimeException for Hive TException with rename table [iceberg]
pvary merged PR #9432: URL: https://github.com/apache/iceberg/pull/9432 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Hive: Unwrap RuntimeException for Hive TException with rename table [iceberg]
pvary commented on PR #9432: URL: https://github.com/apache/iceberg/pull/9432#issuecomment-1895498122 Thanks @nk1506 for the PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Update iceberg_bug_report.yml to 1.4.3 [iceberg]
nastra merged PR #9491: URL: https://github.com/apache/iceberg/pull/9491 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Check stale issues in ascending order [iceberg]
manuzhang commented on code in PR #9489: URL: https://github.com/apache/iceberg/pull/9489#discussion_r1455190604 ## .github/workflows/stale.yml: ## @@ -47,3 +47,4 @@ jobs: close-issue-message: > This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' + ascending: true Review Comment: With the default descending order, it always checks the latest issues and only a small amount can be closed. Older issues are never checked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Check stale issues in ascending order [iceberg]
nastra merged PR #9489: URL: https://github.com/apache/iceberg/pull/9489 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Add view support for JDBC catalog [iceberg]
ajantha-bhat commented on code in PR #9487: URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455179512 ## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ## @@ -53,21 +58,19 @@ final class JdbcUtil { + " WHERE " + CATALOG_NAME + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? AND " + + " %s = ? AND " // table or view namespace Review Comment: nit: we can remove the extra space before %s ## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ## @@ -303,6 +331,263 @@ public static Properties filterAndRemovePrefix(Map properties, S return result; } + /** + * Create SQL statement to get a table or view. + * + * @param table true to get the SQL statement for a table, false for a view. + * @return the SQL statement. + */ + public static String getTableOrViewSqlStatement(boolean table) { +String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; +String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; +String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; +return String.format( +GET_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, tableOrViewName); + } + + /** + * Create SQL statement to update a table or view. + * + * @param table true to get the SQL statement for a table, false for a view. + * @return the SQL statement. + */ + public static String doCommitSqlStatement(boolean table) { +String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; +String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; +String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; +return String.format( +DO_COMMIT_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, tableOrViewName); + } + + /** + * Create SQL statement to create a table or view. + * + * @param table true to get the SQL statement for a table, false for a view. + * @return the SQL statement. + */ + public static String createCatalogTableOrViewSqlStatement(boolean table) { +String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; +String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; +String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; +return String.format( +CREATE_CATALOG_TABLE_OR_VIEW, +tableOrViewTableName, +tableOrViewNamespace, +tableOrViewName, +tableOrViewNamespace, +tableOrViewName); + } + + /** + * Create SQL statement to list tables or views. + * + * @param table true to get the SQL statement for a table, false for a view. + * @return the SQL statement. + */ + public static String listTablesOrViewsSqlStatement(boolean table) { +String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; +String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; +return String.format(LIST_TABLES_OR_VIEWS_SQL, tableOrViewTableName, tableOrViewNamespace); + } + + /** + * Create SQL statement to rename a table or view. + * + * @param table true to get the SQL statement for a table, false for a view. + * @return the SQL statement. + */ + public static String renameTableOrViewSqlStatement(boolean table) { +String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; +String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; +String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; +return String.format( +RENAME_TABLE_OR_VIEW_SQL, +tableOrViewTableName, +tableOrViewNamespace, +tableOrViewName, +tableOrViewNamespace, +tableOrViewName); + } + + /** + * Create SQL statement to delete a table or view. + * + * @param table true to get the SQL statement for a table, false for a view. + * @return the SQL statement. + */ + public static String dropTableOrViewSqlStatement(boolean table) { +String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; +String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; +String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; +return String.format( +DROP_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, tableOrViewName); + } + + /** + * Create SQL statement to create a table or view. + * + * @param table true to get the SQL statement for a table, false for a view. + * @return the SQL statement. + */ + public static String doCommitCreateTableOrViewSqlStatement(boolean table) { +String tableOrViewTableName = table ? CATALOG_TABLE_NAME : CATALOG_VIEW_NAME; +String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE; +String tableOrViewName = table ? TABLE_NAME : VIEW_NAME; +return String.format(
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455299682 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { +val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) +qe.assertAnalyzed() +val analyzedPlan = qe.analyzed + +val identifier = Spark3Util.toV1TableIdentifier(ident) + +if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } +} + +val queryColumnNames = analyzedPlan.schema.fieldNames +SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + +val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema +val columnAliases = userSpecifiedColumns.map(_._1).toArray +val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray + +val currentCatalog = session.sessionState.catalogManager.currentCatalog.name Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Close the MetricsReporter when the Catalog is closed. [iceberg]
nastra commented on code in PR #9353: URL: https://github.com/apache/iceberg/pull/9353#discussion_r1455363550 ## core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java: ## @@ -482,7 +489,13 @@ public boolean removeProperties(Namespace namespace, Set properties) @Override public void close() { -connections.close(); +if (closeableGroup != null) { + try { Review Comment: I don't think we need to wrap this in a try-catch block here and none of the other catalogs do, so I think we can remove this here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Close the MetricsReporter when the Catalog is closed. [iceberg]
nastra commented on code in PR #9353: URL: https://github.com/apache/iceberg/pull/9353#discussion_r1455378169 ## aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java: ## @@ -143,6 +142,7 @@ void initialize( this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(dynamo); closeableGroup.addCloseable(fileIO); +closeableGroup.addCloseable(metricsReporter()); Review Comment: I just wanted to point out that this effectively will initialize the reporter much earlier (during catalog instantiation) than previously (during table loading). This is probably ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Docs: Add distribution mode not respected for CTAS/RTAS before Spark 3.5.0 [iceberg]
nastra commented on code in PR #9439: URL: https://github.com/apache/iceberg/pull/9439#discussion_r1455411644 ## docs/spark-writes.md: ## @@ -343,7 +343,8 @@ data.writeTo("prod.db.sample").option("mergeSchema","true").append() Iceberg's default Spark writers require that the data in each spark task is clustered by partition values. This distribution is required to minimize the number of file handles that are held open while writing. By default, starting in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written to fit this distribution. The -request to Spark is done through the table property `write.distribution-mode` with the value `hash`. +request to Spark is done through the table property `write.distribution-mode` with the value `hash`. Spark doesn't respect +distribution mode in CTAS/RTAS before 3.5.0. Review Comment: do we have tests that check the behavior in Spark 3.4 + Spark 3.5 on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump Spark 3.3 from 3.3.3 to 3.3.4 [iceberg]
nastra merged PR #9492: URL: https://github.com/apache/iceberg/pull/9492 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]
Fokko commented on PR #265: URL: https://github.com/apache/iceberg-python/pull/265#issuecomment-1895723645 Thanks @syun64 for working on this 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]
Fokko merged PR #265: URL: https://github.com/apache/iceberg-python/pull/265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] SqlCatalog _commit_table() support [iceberg-python]
Fokko closed issue #262: SqlCatalog _commit_table() support URL: https://github.com/apache/iceberg-python/issues/262 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] SqlCatalog _commit_table() support [iceberg-python]
Fokko commented on issue #262: URL: https://github.com/apache/iceberg-python/issues/262#issuecomment-1895726712 Has been fixed in #265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Add view support for JDBC catalog [iceberg]
jbonofre commented on code in PR #9487: URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455472169 ## core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java: ## @@ -157,8 +159,10 @@ private void initializeCatalogTables() throws InterruptedException, SQLException return true; } - LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME); - return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute(); + LOG.debug( + "Creating table {} to store iceberg catalog tables", JdbcUtil.CATALOG_TABLE_NAME); + return conn.prepareStatement(JdbcUtil.createCatalogTableOrViewSqlStatement(true)) Review Comment: OK, I will do that (not sure these methods are actually used by end user though). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Add view support for JDBC catalog [iceberg]
jbonofre commented on code in PR #9487: URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455473193 ## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ## @@ -53,21 +58,19 @@ final class JdbcUtil { + " WHERE " + CATALOG_NAME + " = ? AND " - + TABLE_NAMESPACE - + " = ? AND " - + TABLE_NAME - + " = ? AND " + + " %s = ? AND " // table or view namespace Review Comment: Indeed, I will also clean the existing SQL statements (which also have extra spaces). Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Add view support for JDBC catalog [iceberg]
jbonofre commented on code in PR #9487: URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455474276 ## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ## @@ -139,7 +134,22 @@ final class JdbcUtil { + " LIKE ? ESCAPE '\\' " + " ) " + " LIMIT 1"; - static final String LIST_NAMESPACES_SQL = + static final String GET_VIEW_NAMESPACE_SQL = Review Comment: Good point, I will use the same approach with SQL statement "generated" method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] Build/Release: Upgrade to Apache RAT 0.16 and scan hidden directories [iceberg]
jbonofre opened a new issue, #9494: URL: https://github.com/apache/iceberg/issues/9494 ### Feature Request / Improvement As identified on a previous Iceberg release, apache-rat 0.15 doesn't scan hidden directories. It's not good as the hidden directories are part of the released Iceberg source distribution. I added `--scan-hidden-directories` option to apache-rat 0.16 allowing us to scan all directories which are part of the source distribution. ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Build: Upgrade to Apache RAT 0.16, scanning hidden directories and adding missing ASF header [iceberg]
jbonofre opened a new pull request, #9495: URL: https://github.com/apache/iceberg/pull/9495 This PR does: - upgrade to Apache RAT 0.16 - add `--scan-hidden-directories` option - add ASF header where missing - add new excluded file from RAT check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Upgrade to Apache RAT 0.16, scanning hidden directories and adding missing ASF header [iceberg]
jbonofre commented on PR #9495: URL: https://github.com/apache/iceberg/pull/9495#issuecomment-1895818508 @Fokko can you please take a look ? Thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Docs: Fix typo in tag reading example [iceberg]
pvary opened a new pull request, #9496: URL: https://github.com/apache/iceberg/pull/9496 Small fix in the docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Add small test on duplicate changes [iceberg-python]
Fokko opened a new pull request, #273: URL: https://github.com/apache/iceberg-python/pull/273 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Docs: Fix typo in tag reading example [iceberg]
nastra merged PR #9496: URL: https://github.com/apache/iceberg/pull/9496 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Set `ghp_path` to `/` [iceberg]
Fokko merged PR #9493: URL: https://github.com/apache/iceberg/pull/9493 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
nastra commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1455627217 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.execution.command.ViewHelper + +object ViewCheck extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { +plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => Review Comment: does your comment still apply after https://github.com/apache/iceberg/pull/9423#discussion_r1455100866? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Upgrade to Apache RAT 0.16, scanning hidden directories and adding missing ASF header [iceberg]
ajantha-bhat commented on code in PR #9495: URL: https://github.com/apache/iceberg/pull/9495#discussion_r1455712834 ## dev/check-license: ## @@ -68,7 +68,7 @@ mkdir -p "$FWDIR"/lib } mkdir -p build -$java_cmd -jar "$rat_jar" -E "$FWDIR"/dev/.rat-excludes -d "$FWDIR" > build/rat-results.txt +$java_cmd -jar "$rat_jar" --scan-hidden-directories -E "$FWDIR"/dev/.rat-excludes -d "$FWDIR" > build/rat-results.txt Review Comment: Awesome contribution on adding `--scan-hidden-directories` options at the rat plugin side JB 👍 https://github.com/apache/creadur-rat/pull/166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark 3.5: Spark action to compute the partition stats [iceberg]
ajantha-bhat commented on code in PR #9437: URL: https://github.com/apache/iceberg/pull/9437#discussion_r1454359159 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ## @@ -150,6 +154,21 @@ protected Dataset contentFileDS(Table table, Set snapshotIds) { Broadcast tableBroadcast = sparkContext.broadcast(serializableTable); int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); +return manifestBeanDS(table, snapshotIds, numShufflePartitions) +.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER); + } + + protected Dataset partitionEntryDS(Table table) { +Table serializableTable = SerializableTableWithSize.copyOf(table); +Broadcast tableBroadcast = sparkContext.broadcast(serializableTable); +int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); + +return manifestBeanDS(table, null, numShufflePartitions) Review Comment: > Is it actually correct? This code would go via ALL_MANIFESTS table. Shouldn't we only look for manifests in a particular snapshot for which we compute the stats? I just followed the same pattern from partitions metadata table which goes through all the manifests from all the snapshot. https://github.com/apache/iceberg/blob/31d18f51b9e8590f7ca316463b080bd1153e8f9e/core/src/main/java/org/apache/iceberg/PartitionsTable.java#L186-L195 Let me think on this today and get back to you. Also, I need to understand why they are going through all manifests in partitions metadata table (cc: @szehon-ho) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Hive integration tests [iceberg-python]
Fokko commented on code in PR #207: URL: https://github.com/apache/iceberg-python/pull/207#discussion_r1455755578 ## tests/integration/test_hive.py: ## @@ -0,0 +1,409 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import math +import uuid +from typing import Dict +from urllib.parse import urlparse + +import pyarrow.parquet as pq +import pytest +from pyarrow.fs import S3FileSystem + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.expressions import ( +And, +EqualTo, +GreaterThanOrEqual, +IsNaN, +LessThan, +NotEqualTo, +NotNaN, +) +from pyiceberg.io.pyarrow import pyarrow_to_schema +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.types import ( +BooleanType, +IntegerType, +NestedField, +StringType, +TimestampType, +) + +DEFAULT_PROPERTIES: Dict[str, str] = {} + + +@pytest.fixture() +def catalog() -> Catalog: +return load_catalog( +"local", +**{ +"type": "hive", +"uri": "http://localhost:9083";, +"s3.endpoint": "http://localhost:9000";, +"s3.access-key-id": "admin", +"s3.secret-access-key": "password", +}, +) + + +@pytest.fixture() +def table_test_null_nan(catalog: Catalog) -> Table: +return catalog.load_table("default.test_null_nan") + + +@pytest.fixture() +def table_test_null_nan_rewritten(catalog: Catalog) -> Table: +return catalog.load_table("default.test_null_nan_rewritten") + + +@pytest.fixture() +def table_test_limit(catalog: Catalog) -> Table: Review Comment: I actually like this suggestion a lot, let me add this to the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark 3.5: Spark action to compute the partition stats [iceberg]
ajantha-bhat commented on code in PR #9437: URL: https://github.com/apache/iceberg/pull/9437#discussion_r1455763573 ## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ## @@ -150,6 +154,21 @@ protected Dataset contentFileDS(Table table, Set snapshotIds) { Broadcast tableBroadcast = sparkContext.broadcast(serializableTable); int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); +return manifestBeanDS(table, snapshotIds, numShufflePartitions) +.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER); + } + + protected Dataset partitionEntryDS(Table table) { +Table serializableTable = SerializableTableWithSize.copyOf(table); +Broadcast tableBroadcast = sparkContext.broadcast(serializableTable); +int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); + +return manifestBeanDS(table, null, numShufflePartitions) Review Comment: > Is it actually correct? This code would go via ALL_MANIFESTS table. Shouldn't we only look for manifests in a particular snapshot for which we compute the stats? True. I got confused with `snapshot().allManifests()` to all manifest table. I need to change this. And Thanks for detailed distributed and local algorithm. For my distributed algorithm, I faced problem with serialization of `partitionData` (avro class issue) thats why I had to keep most of the logic at Driver. I am not fully aware about how to implement the distributed algorithm that you have suggested. I will explore on that. In the mean time you can also review https://github.com/apache/iceberg/pull/9170 (which is independent and prerequisite for this PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark 3.5: Spark action to compute the partition stats [iceberg]
ajantha-bhat commented on code in PR #9437: URL: https://github.com/apache/iceberg/pull/9437#discussion_r1455766754 ## api/src/main/java/org/apache/iceberg/actions/ComputePartitionStats.java: ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import org.apache.iceberg.PartitionStatisticsFile; + +/** + * An action to compute partition stats for the latest snapshot and registers it to the + * TableMetadata file + */ +public interface ComputePartitionStats Review Comment: we should support it. Since stats is mapped to snapshot ID. It should be easy to work with branch and tags. I will handle this in a follow up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] Hive Catalog: Implement `_commit_table` [iceberg-python]
Fokko opened a new issue, #275: URL: https://github.com/apache/iceberg-python/issues/275 ### Feature Request / Improvement Probably very similar to the Glue/Sql one :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] Two-level parquet read EOF error: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current page. repetition lev
gaoshihang opened a new issue, #9497: URL: https://github.com/apache/iceberg/issues/9497 ### Apache Iceberg version 1.4.3 (latest release) ### Query engine Spark ### Please describe the bug 🐞 We have a two-level parquet list, the schema like below:  Now if this array is an empty array: [], when we using add_files function add this parquet to a table, then query will throw this exception: ``` Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current page. repetition level: -1, definition level: -1 at org.apache.iceberg.parquet.PageIterator.handleRuntimeException(PageIterator.java:220) at org.apache.iceberg.parquet.PageIterator.nextInteger(PageIterator.java:141) at org.apache.iceberg.parquet.ColumnIterator.nextInteger(ColumnIterator.java:121) at org.apache.iceberg.parquet.ColumnIterator$2.next(ColumnIterator.java:41) at org.apache.iceberg.parquet.ColumnIterator$2.next(ColumnIterator.java:38) at org.apache.iceberg.parquet.ParquetValueReaders$UnboxedReader.read(ParquetValueReaders.java:246) at org.apache.iceberg.parquet.ParquetValueReaders$RepeatedReader.read(ParquetValueReaders.java:467) at org.apache.iceberg.parquet.ParquetValueReaders$OptionReader.read(ParquetValueReaders.java:419) at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.read(ParquetValueReaders.java:745) at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:130) at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65) at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:49) at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:129) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.parquet.io.ParquetDecodingException: could not read int at org.apache.parquet.column.values.plain.PlainValuesReader$IntegerPlainValuesReader.readInteger(PlainValuesReader.java:114) at org.apache.iceberg.parquet.PageIterator.nextInteger(PageIterator.java:139) ... 32 more Caused by: java.io.EOFException at org.apache.parquet.bytes.SingleBufferInputStream.read(SingleBufferInputStream.java:52) at org.apache.parquet.bytes.LittleEndianDataInputStream.readInt(LittleEndianDataInputStream.java:347) at org.apache.parquet.column.values.plain.PlainValuesReader$IntegerPlainValuesReader.readInteger(PlainValuesReader.java:112) ... 33 more ``` And I read the code in Iceberg-parquet, it seems like this do-while will never exist:  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries a
Re: [PR] Add Hive integration tests [iceberg-python]
Fokko commented on PR #207: URL: https://github.com/apache/iceberg-python/pull/207#issuecomment-1896052914 Thanks @HonahX for the review 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Hive integration tests [iceberg-python]
Fokko merged PR #207: URL: https://github.com/apache/iceberg-python/pull/207 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]
N-o-Z opened a new pull request, #9498: URL: https://github.com/apache/iceberg/pull/9498 Closes #9485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]
N-o-Z commented on PR #9498: URL: https://github.com/apache/iceberg/pull/9498#issuecomment-1896152644 @amogh-jahagirdar, FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Purge support for Iceberg view [iceberg]
rdblue commented on issue #9433: URL: https://github.com/apache/iceberg/issues/9433#issuecomment-1896190645 What is the proposed behavior for a purge operation? How does this apply to views? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files [iceberg]
stevenzwu commented on PR #9464: URL: https://github.com/apache/iceberg/pull/9464#issuecomment-1896232286 > If I understand correctly, the FileScanTask json will contain the Schema. The Schema has a doc field for comments. Do we have restrictions defined for the doc field? @pvary you are right. agree that doc field may contain non ASCII chars. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]
amogh-jahagirdar commented on code in PR #9498: URL: https://github.com/apache/iceberg/pull/9498#discussion_r1456088754 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -360,7 +360,10 @@ int findVersion() { */ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { try { - lockManager.acquire(dst.toString(), src.toString()); + boolean success = lockManager.acquire(dst.toString(), src.toString()); + if (!success) { +throw new CommitFailedException("Failed to acquire lock on file: %s", dst); + } Review Comment: Style nit, if we could add a new line after this `if` block separating it from the next `if`. I know in this file it looks like we didn't really follow our normal practice already but at least for new changes we could follow it. Also I think I'd just inline the boolean so ``` if (!lockManager.acquire(dst.toString(), src.toStrnig()) { throw ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Hadoop: Fix: HadoopTableOperations renameToFinal [iceberg]
amogh-jahagirdar commented on code in PR #9498: URL: https://github.com/apache/iceberg/pull/9498#discussion_r1456088754 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -360,7 +360,10 @@ int findVersion() { */ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { try { - lockManager.acquire(dst.toString(), src.toString()); + boolean success = lockManager.acquire(dst.toString(), src.toString()); + if (!success) { +throw new CommitFailedException("Failed to acquire lock on file: %s", dst); + } Review Comment: Style nit, if we could add a new line after this `if` block separating it from the next `if`! I know in this file it looks like we didn't really follow our normal practice already but at least for new changes we could follow it. Also I think I'd just inline the boolean so ``` if (!lockManager.acquire(dst.toString(), src.toStrnig()) { throw ... } ``` ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -360,7 +360,10 @@ int findVersion() { */ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { try { - lockManager.acquire(dst.toString(), src.toString()); + boolean success = lockManager.acquire(dst.toString(), src.toString()); + if (!success) { +throw new CommitFailedException("Failed to acquire lock on file: %s", dst); + } Review Comment: Also should we include the owner (src) in the exception as well? ## core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java: ## @@ -451,4 +455,39 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { Assertions.assertThat(Lists.newArrayList(tableWithHighRetries.snapshots())) .hasSize(threadsCount * numberOfCommitedFilesPerThread); } -} + + @Test + public void testCommitFailedToAcquire() { +table.newFastAppend().appendFile(FILE_A).commit(); +Configuration conf = new Configuration(); +LockManager lockManager = new NoLockManager(); +HadoopTableOperations tops = Review Comment: Naming nit: I think it's better to be a bit more verbose rather than truncate, since it's more readable imo. Instead of `tops` could we call it `tableOperations` like we do in other places? Also maybe `testCommitFailedToAcquireLock` for the test name? ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -383,7 +386,10 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { } throw cfe; } finally { - lockManager.release(dst.toString(), src.toString()); + boolean success = lockManager.release(dst.toString(), src.toString()); + if (!success) { +LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + } Review Comment: Same nit as above, I think we should just inline the boolean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Fix lock acquisition logic in HadoopTableOperations rename [iceberg]
amogh-jahagirdar commented on PR #9498: URL: https://github.com/apache/iceberg/pull/9498#issuecomment-1896295071 looks like spotless checks are failing: if you could run ``` ./gradlew spotlessApply ``` before pushing your next changes that would fix it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Cannot write nullable values to non-null column in the Iceberg Table [iceberg]
abharath9 commented on issue #9488: URL: https://github.com/apache/iceberg/issues/9488#issuecomment-1896331390 @nastra Yes i am aware of that. How do i write optional fields data to the mandatory fields data. It is mentioned in this issue that it is possible by setting "spark.sql.iceberg.check-nullability", 'false' in spark config but it is not working. I wonder if there's anything that i am missing or any other config to add. https://github.com/apache/iceberg/pull/514 Just FYI, the above code is working fine if i upgrade to spark 3.5 and iceberg 1.4.0. I want to get this working with Spark 3.2.0 and Iceberg 0.13.0. Thanks and appreciate any help -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Two-level parquet read EOF error: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current page. repetition
gaoshihang commented on issue #9497: URL: https://github.com/apache/iceberg/issues/9497#issuecomment-1896362482 And here is the iceberg schema [v8.metadata.json](https://github.com/apache/iceberg/files/13967089/v8.metadata.json) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Two-level parquet read EOF error: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [a, array] repeated int32 array = 2 at value 4 out of 4 in current page. repetition
gaoshihang commented on issue #9497: URL: https://github.com/apache/iceberg/issues/9497#issuecomment-1896369837 And here is the parquet file we used to add_files. (need to change the .log to .parquet) [user_error_parquet.log](https://github.com/apache/iceberg/files/13967114/user_error_parquet.log) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Fix lock acquisition logic in HadoopTableOperations rename [iceberg]
N-o-Z commented on PR #9498: URL: https://github.com/apache/iceberg/pull/9498#issuecomment-1896383730 @amogh-jahagirdar Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] API, Core, Spark: Add fastForwardOrCreate API and integrate that with Spark fast forward procedure [iceberg]
rdblue commented on PR #9196: URL: https://github.com/apache/iceberg/pull/9196#issuecomment-1896389067 @amogh-jahagirdar, I think I would prefer the second alternative, to change the behavior of fast-forward. I doubt that anyone relies on fast-forward _not_ creating a branch and failing instead. This would be avoiding an error condition that is likely surprising behavior to most users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456300895 ## mkdocs/docs/api.md: ## @@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata( The static-table is considered read-only. +## Write support + +With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table: + +```python +import pyarrow as pa + +df = pa.Table.from_pylist( +[ +{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, +{"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, +{"city": "Drachten", "lat": 53.11254, "long": 6.0989}, +{"city": "Paris", "lat": 48.864716, "long": 2.349014}, +], +) +``` + +Next, create a table based on the schema: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( +NestedField(1, "city", StringType(), required=False), +NestedField(2, "lat", DoubleType(), required=False), +NestedField(3, "long", DoubleType(), required=False), Review Comment: Isn't `required=False` the default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
Fokko commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456327592 ## mkdocs/docs/api.md: ## @@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata( The static-table is considered read-only. +## Write support + +With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table: + +```python +import pyarrow as pa + +df = pa.Table.from_pylist( +[ +{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, +{"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, +{"city": "Drachten", "lat": 53.11254, "long": 6.0989}, +{"city": "Paris", "lat": 48.864716, "long": 2.349014}, +], +) +``` + +Next, create a table based on the schema: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( +NestedField(1, "city", StringType(), required=False), +NestedField(2, "lat", DoubleType(), required=False), +NestedField(3, "long", DoubleType(), required=False), Review Comment: No, the default is the more strict `True`. I've set it to False because PyArrow produces nullable fields by default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Purge support for Iceberg view [iceberg]
nk1506 commented on issue #9433: URL: https://github.com/apache/iceberg/issues/9433#issuecomment-1896529369 With purge enablement similar like [dropTable](https://github.com/apache/iceberg/blob/66b1aa662761606d4d68d99371c62505e7ac2f1e/api/src/main/java/org/apache/iceberg/catalog/Catalog.java#L307) there should be a way to delete View Metadata files. In case of multiple version of views it can have multiple metadata files. Adding purge support will help to delete all the related metadata files. WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Apply Name mapping, new_schema_for_table [iceberg-python]
syun64 commented on code in PR #219: URL: https://github.com/apache/iceberg-python/pull/219#discussion_r1456437583 ## pyiceberg/io/pyarrow.py: ## @@ -733,42 +854,178 @@ def _get_field_id(field: pa.Field) -> Optional[int]: ) -class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): -def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]: -fields = [] -for i, field in enumerate(arrow_fields): -field_id = _get_field_id(field) -field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None -field_type = field_results[i] -if field_type is not None and field_id is not None: -fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)) -return fields - -def schema(self, schema: pa.Schema, field_results: List[Optional[IcebergType]]) -> Schema: -return Schema(*self._convert_fields(schema, field_results)) - -def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType]]) -> IcebergType: -return StructType(*self._convert_fields(struct, field_results)) - -def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> Optional[IcebergType]: +class _HasIds(PyArrowSchemaVisitor[bool]): +def schema(self, schema: pa.Schema, struct_result: bool) -> bool: +return struct_result + +def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool: +return all(field_results) + +def field(self, field: pa.Field, field_result: bool) -> bool: +return all([_get_field_id(field) is not None, field_result]) + +def list(self, list_type: pa.ListType, element_result: bool) -> bool: element_field = list_type.value_field element_id = _get_field_id(element_field) -if element_result is not None and element_id is not None: -return ListType(element_id, element_result, element_required=not element_field.nullable) -return None +return element_result and element_id is not None -def map( -self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] -) -> Optional[IcebergType]: +def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) -> bool: key_field = map_type.key_field key_id = _get_field_id(key_field) value_field = map_type.item_field value_id = _get_field_id(value_field) -if key_result is not None and value_result is not None and key_id is not None and value_id is not None: -return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable) -return None +return all([key_id is not None, value_id is not None, key_result, value_result]) + +def primitive(self, primitive: pa.DataType) -> bool: +return True + + +class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): +"""Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided.""" + +_field_names: List[str] +_name_mapping: Optional[NameMapping] + +def __init__(self, name_mapping: Optional[NameMapping] = None) -> None: +self._field_names = [] +self._name_mapping = name_mapping + +def _current_path(self) -> str: +return ".".join(self._field_names) + +def _field_id(self, field: pa.Field) -> int: +if self._name_mapping: +return self._name_mapping.find(self._current_path()).field_id +elif (field_id := _get_field_id(field)) is not None: +return field_id +else: +raise ValueError(f"Cannot convert {field} to Iceberg Field as field_id is empty.") + +def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema: +return Schema(*struct_result.fields) + +def struct(self, struct: pa.StructType, field_results: List[NestedField]) -> StructType: +return StructType(*field_results) + +def field(self, field: pa.Field, field_result: IcebergType) -> NestedField: +field_id = self._field_id(field) +field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None +field_type = field_result +return NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc) + +def list(self, list_type: pa.ListType, element_result: IcebergType) -> ListType: +element_field = list_type.value_field +self._field_names.append(LIST_ELEMENT_NAME) +element_id = self._field_id(element_field) +self._field_names.pop() +return ListType(element_id, element_result, element_required=not element_field.nullable) -def primitive
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456467981 ## pyiceberg/table/__init__.py: ## @@ -856,6 +909,61 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) +def append(self, df: pa.Table) -> None: +""" +Append data to the table. + +Args: +df: The Arrow dataframe that will be appended to overwrite the table +""" +if len(self.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(self.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +snapshot_id = self.new_snapshot_id() Review Comment: Minor: this can be handled inside of `_MergeAppend` since it has the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456469457 ## pyiceberg/table/__init__.py: ## @@ -856,6 +909,61 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) +def append(self, df: pa.Table) -> None: +""" +Append data to the table. + +Args: +df: The Arrow dataframe that will be appended to overwrite the table +""" +if len(self.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(self.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +snapshot_id = self.new_snapshot_id() + +data_files = _dataframe_to_data_files(self, df=df) +merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id) Review Comment: Is this really a "merge append" if the operation may be overwrite? You might consider using `_MergingCommit` or `_MergingSnapshotProducer` (if you want to follow the Java convention). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456478299 ## pyiceberg/table/__init__.py: ## @@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) +def append(self, df: pa.Table) -> None: +if len(self.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +snapshot_id = self.new_snapshot_id() + +data_files = _dataframe_to_data_files(self, df=df) +merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id) +for data_file in data_files: +merge.append_datafile(data_file) + +if current_snapshot := self.current_snapshot(): +for manifest in current_snapshot.manifests(io=self.io): +for entry in manifest.fetch_manifest_entry(io=self.io): +merge.append_datafile(entry.data_file, added=False) + +merge.commit() + +def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None: Review Comment: :sweat_smile: You're right, that was more of a statement than a question. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] [HadoopCatalog]: [HadoopTableOperations]: Commit flow, renameToFinal does not actually check if lock acquired [iceberg]
amogh-jahagirdar closed issue #9485: [HadoopCatalog]: [HadoopTableOperations]: Commit flow, renameToFinal does not actually check if lock acquired URL: https://github.com/apache/iceberg/issues/9485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Fix lock acquisition logic in HadoopTableOperations rename [iceberg]
amogh-jahagirdar merged PR #9498: URL: https://github.com/apache/iceberg/pull/9498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456486632 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: Review Comment: Should this return an iterator instead of a list? Looks like `fetch_manifest_entry` returns a list, but we may not need so much copying. Not sure if that's a concern in Python. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456487418 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: Review Comment: Wait, I see that this is run in the executor. It's probably creating a list so that the construction happens in parallel. Nevermind! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456489014 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: +return [ +ManifestEntry( +status=ManifestEntryStatus.DELETED, +snapshot_id=entry.snapshot_id, +data_sequence_number=entry.data_sequence_number, +file_sequence_number=entry.file_sequence_number, +data_file=entry.data_file, +) +for entry in manifest.fetch_manifest_entry(self._table.io, discard_deleted=True) +] + +list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._table.io)) Review Comment: It may be a good idea to defensively use only data manifests here instead of all manifests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issue
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456491428 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: +return [ +ManifestEntry( +status=ManifestEntryStatus.DELETED, +snapshot_id=entry.snapshot_id, +data_sequence_number=entry.data_sequence_number, +file_sequence_number=entry.file_sequence_number, Review Comment: Looks good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456493214 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: +return [ +ManifestEntry( +status=ManifestEntryStatus.DELETED, +snapshot_id=entry.snapshot_id, +data_sequence_number=entry.data_sequence_number, +file_sequence_number=entry.file_sequence_number, +data_file=entry.data_file, +) +for entry in manifest.fetch_manifest_entry(self._table.io, discard_deleted=True) +] + +list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._table.io)) +return list(chain(*list_of_entries)) +return [] +elif self._operation == Operation.APPEND: +return [] +else: +raise ValueError(f"Not implemented for: {self._operation}") + +def _manifests(self) -> List[ManifestFile]: +manifests = [] Review Comment: Minor: Since this is empty, it looks like this is just the newly created manifests. It may be a good idea to name this `new_manifests`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL abov
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456495179 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: +return [ +ManifestEntry( +status=ManifestEntryStatus.DELETED, +snapshot_id=entry.snapshot_id, +data_sequence_number=entry.data_sequence_number, +file_sequence_number=entry.file_sequence_number, +data_file=entry.data_file, +) +for entry in manifest.fetch_manifest_entry(self._table.io, discard_deleted=True) +] + +list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._table.io)) +return list(chain(*list_of_entries)) +return [] +elif self._operation == Operation.APPEND: +return [] +else: +raise ValueError(f"Not implemented for: {self._operation}") + +def _manifests(self) -> List[ManifestFile]: +manifests = [] +deleted_entries = self._deleted_entries() + +if self._added_data_files: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._tabl
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456498171 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: +return [ +ManifestEntry( +status=ManifestEntryStatus.DELETED, +snapshot_id=entry.snapshot_id, +data_sequence_number=entry.data_sequence_number, +file_sequence_number=entry.file_sequence_number, +data_file=entry.data_file, +) +for entry in manifest.fetch_manifest_entry(self._table.io, discard_deleted=True) +] + +list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._table.io)) +return list(chain(*list_of_entries)) +return [] +elif self._operation == Operation.APPEND: +return [] +else: +raise ValueError(f"Not implemented for: {self._operation}") + +def _manifests(self) -> List[ManifestFile]: +manifests = [] +deleted_entries = self._deleted_entries() + +if self._added_data_files: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._tabl
[PR] Add 1.4.3 docs [iceberg]
bitsondatadev opened a new pull request, #9499: URL: https://github.com/apache/iceberg/pull/9499 Add 1.4.3 docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456502771 ## pyiceberg/table/__init__.py: ## @@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_data_file_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +if len(table.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +if len(table.sort_order().fields) > 0: +raise ValueError("Cannot write to tables with a sort-order") + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_data_files: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_data_files = [] +self._commit_uuid = uuid.uuid4() + +def append_data_file(self, data_file: DataFile) -> _MergeAppend: +self._added_data_files.append(data_file) +return self + +def _deleted_entries(self) -> List[ManifestEntry]: +"""To determine if we need to record any deleted entries. + +With partial overwrites we have to use the predicate to evaluate +which entries are affected. +""" +if self._operation == Operation.OVERWRITE: +if self._parent_snapshot_id is not None: +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) +if previous_snapshot is None: +# This should never happen since you cannot overwrite an empty table +raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + +executor = ExecutorFactory.get_or_create() + +def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: +return [ +ManifestEntry( +status=ManifestEntryStatus.DELETED, +snapshot_id=entry.snapshot_id, +data_sequence_number=entry.data_sequence_number, +file_sequence_number=entry.file_sequence_number, +data_file=entry.data_file, +) +for entry in manifest.fetch_manifest_entry(self._table.io, discard_deleted=True) +] + +list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._table.io)) +return list(chain(*list_of_entries)) +return [] +elif self._operation == Operation.APPEND: +return [] +else: +raise ValueError(f"Not implemented for: {self._operation}") + +def _manifests(self) -> List[ManifestFile]: +manifests = [] +deleted_entries = self._deleted_entries() + +if self._added_data_files: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._tabl
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456523045 ## mkdocs/docs/api.md: ## @@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata( The static-table is considered read-only. +## Write support + +With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table: Review Comment: Thanks for this example! Made it really easy to test out. The example works great cut & pasted into a REPL. I also tested modifications to the dataframe schema passed to append and it does the right thing. I get a schema error for a few cases: * Missing column `long` * Type mismatch `string` instead of `double` * Extra column `country` Looks like Arrow requires that the schema matches, which is great. It would be nice to allow some type promotion in the future. I'm not sure whether arrow would automatically write floats into double columns, for example. I would also like to make sure we have better error messages, not just "ValueError: Table schema does not match schema used to create file: ...". Those will be good follow ups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on PR #41: URL: https://github.com/apache/iceberg-python/pull/41#issuecomment-1896944638 @Fokko, this works great and I don't see any blockers so I've approved it. I think there are a few things to consider in terms of how we want to do this moving forward (whether to use separate manifests for example) but we can get this in and iterate from there. It also looks like this is pretty close to being able to run the overwrite filter, too! Great work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add 1.4.3 docs [iceberg]
dramaticlly commented on code in PR #9499: URL: https://github.com/apache/iceberg/pull/9499#discussion_r1456526671 ## 1.4.3/mkdocs.yml: ## @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +site_name: docs/1.4.2 Review Comment: should this be 1.4.3 instead? ## 1.4.3/mkdocs.yml: ## @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +site_name: docs/1.4.2 + +plugins: + - search + +nav: + - index.md + - Tables: +- branching.md +- configuration.md +- evolution.md +- maintenance.md +- partitioning.md +- performance.md +- reliability.md +- schemas.md + - Spark: +- spark-getting-started.md +- spark-configuration.md +- spark-ddl.md +- spark-procedures.md +- spark-queries.md +- spark-structured-streaming.md +- spark-writes.md + - Flink: +- flink.md +- flink-connector.md +- flink-ddl.md +- flink-queries.md +- flink-writes.md +- flink-actions.md +- flink-configuration.md + - hive.md + - Trino: https://trino.io/docs/current/connector/iceberg.html + - Clickhouse: https://clickhouse.com/docs/en/engines/table-engines/integrations/iceberg + - Presto: https://prestodb.io/docs/current/connector/iceberg.html + - Dremio: https://docs.dremio.com/data-formats/apache-iceberg/ + - Starrocks: https://docs.starrocks.io/en-us/latest/data_source/catalog/iceberg_catalog + - Amazon Athena: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html + - Amazon EMR: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-cluster.html + - Impala: https://impala.apache.org/docs/build/html/topics/impala_iceberg.html + - Doris: https://doris.apache.org/docs/dev/lakehouse/multi-catalog/iceberg + - Integrations: +- aws.md +- dell.md +- jdbc.md +- nessie.md + - API: +- java-api-quickstart.md +- api.md +- custom-catalog.md + - Javadoc: ../../javadoc/1.4.2 Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]
stevenzwu commented on code in PR #9321: URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456527386 ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Pair; + +/** + * Internal partitioner implementation that supports MapDataStatistics, which is typically used for + * low-cardinality use cases. While MapDataStatistics can keep accurate counters, it can't be used + * for high-cardinality use cases. Otherwise, the memory footprint is too high. + */ +class MapRangePartitioner implements Partitioner { + private final RowDataWrapper rowDataWrapper; + private final SortKey sortKey; + private final Comparator comparator; + private final Map mapStatistics; + private final double closeFileCostInWeightPercentage; + + // lazily computed due to the need of numPartitions + private Map assignment; + private NavigableMap sortedStatsWithCloseFileCost; + + MapRangePartitioner( + Schema schema, + SortOrder sortOrder, + MapDataStatistics dataStatistics, + double closeFileCostInWeightPercentage) { +this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); +this.sortKey = new SortKey(schema, sortOrder); +this.comparator = SortOrderComparators.forSchema(schema, sortOrder); +this.mapStatistics = dataStatistics.statistics(); +this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage; + } + + @Override + public int partition(RowData row, int numPartitions) { +// assignment table can only be built lazily when first referenced here, +// because number of partitions (downstream subtasks) is needed +Map assignmentMap = assignment(numPartitions); +// reuse the sortKey and rowDataWrapper +sortKey.wrap(rowDataWrapper.wrap(row)); +KeyAssignment keyAssignment = assignmentMap.get(sortKey); +if (keyAssignment == null) { + // haven't learned about the key before. fall back to random selection. + return ThreadLocalRandom.current().nextInt(numPartitions); Review Comment: I will go with round robin `AtomicLong.getAndIncrement() % N` then. it might be a tiny bit faster than `ThreadLocalRandom.current.nextInt(N)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]
stevenzwu commented on code in PR #9321: URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456527386 ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Pair; + +/** + * Internal partitioner implementation that supports MapDataStatistics, which is typically used for + * low-cardinality use cases. While MapDataStatistics can keep accurate counters, it can't be used + * for high-cardinality use cases. Otherwise, the memory footprint is too high. + */ +class MapRangePartitioner implements Partitioner { + private final RowDataWrapper rowDataWrapper; + private final SortKey sortKey; + private final Comparator comparator; + private final Map mapStatistics; + private final double closeFileCostInWeightPercentage; + + // lazily computed due to the need of numPartitions + private Map assignment; + private NavigableMap sortedStatsWithCloseFileCost; + + MapRangePartitioner( + Schema schema, + SortOrder sortOrder, + MapDataStatistics dataStatistics, + double closeFileCostInWeightPercentage) { +this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); +this.sortKey = new SortKey(schema, sortOrder); +this.comparator = SortOrderComparators.forSchema(schema, sortOrder); +this.mapStatistics = dataStatistics.statistics(); +this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage; + } + + @Override + public int partition(RowData row, int numPartitions) { +// assignment table can only be built lazily when first referenced here, +// because number of partitions (downstream subtasks) is needed +Map assignmentMap = assignment(numPartitions); +// reuse the sortKey and rowDataWrapper +sortKey.wrap(rowDataWrapper.wrap(row)); +KeyAssignment keyAssignment = assignmentMap.get(sortKey); +if (keyAssignment == null) { + // haven't learned about the key before. fall back to random selection. + return ThreadLocalRandom.current().nextInt(numPartitions); Review Comment: I will go with round robin `AtomicLong.getAndIncrement() % N` then. it might be a tiny bit faster than `ThreadLocalRandom.current.nextInt(N)`. Ideally, we want to increment a counter metric in this scenario. I looked in the partitioner interface and how it is used. I couldn't find a way of passing in the `MetricGroup` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]
stevenzwu commented on code in PR #9321: URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456529072 ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Pair; + +/** + * Internal partitioner implementation that supports MapDataStatistics, which is typically used for + * low-cardinality use cases. While MapDataStatistics can keep accurate counters, it can't be used + * for high-cardinality use cases. Otherwise, the memory footprint is too high. + */ +class MapRangePartitioner implements Partitioner { + private final RowDataWrapper rowDataWrapper; + private final SortKey sortKey; + private final Comparator comparator; + private final Map mapStatistics; + private final double closeFileCostInWeightPercentage; + + // lazily computed due to the need of numPartitions + private Map assignment; + private NavigableMap sortedStatsWithCloseFileCost; + + MapRangePartitioner( + Schema schema, + SortOrder sortOrder, + MapDataStatistics dataStatistics, + double closeFileCostInWeightPercentage) { +this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); +this.sortKey = new SortKey(schema, sortOrder); +this.comparator = SortOrderComparators.forSchema(schema, sortOrder); +this.mapStatistics = dataStatistics.statistics(); +this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage; + } + + @Override + public int partition(RowData row, int numPartitions) { +// assignment table can only be built lazily when first referenced here, +// because number of partitions (downstream subtasks) is needed +Map assignmentMap = assignment(numPartitions); +// reuse the sortKey and rowDataWrapper +sortKey.wrap(rowDataWrapper.wrap(row)); +KeyAssignment keyAssignment = assignmentMap.get(sortKey); +if (keyAssignment == null) { + // haven't learned about the key before. fall back to random selection. + return ThreadLocalRandom.current().nextInt(numPartitions); +} + +return keyAssignment.select(); + } + + @VisibleForTesting + Map assignment(int numPartitions) { +if (assignment == null) { + long totalWeight = mapStatistics.values().stream().mapToLong(l -> l).sum(); + double targetWeightPerSubtask = ((double) totalWeight) / numPartitions; + long closeFileCostInWeight = + (long) Math.ceil(targetWeightPerSubtask * closeFileCostInWeightPercentage / 100); + + // add one close file cost for each key even if a key with large weight may be assigned to + // multiple subtasks + this.sortedStatsWithCloseFileCost = Maps.newTreeMap(comparator); + mapStatistics.forEach( + (k, v) -> { +int estimatedSplits = (int) Math.ceil(v / targetWeightPerSubtask); +long estimatedCloseFileCost = closeFileCostInWeight * estimatedSplits; +sortedStatsWithCloseFileCost.put(k, v + estimatedCloseFileCost); + }); + + long totalWe
[PR] Fix community link [iceberg]
bitsondatadev opened a new pull request, #9500: URL: https://github.com/apache/iceberg/pull/9500 The community link works only on top-level site links. Link this to the static site for now, eventually we need to consider a site-wide variable solution but that's not important for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add 1.4.3 docs [iceberg]
bitsondatadev commented on code in PR #9499: URL: https://github.com/apache/iceberg/pull/9499#discussion_r1456528989 ## 1.4.3/mkdocs.yml: ## @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +site_name: docs/1.4.2 + +plugins: + - search + +nav: + - index.md + - Tables: +- branching.md +- configuration.md +- evolution.md +- maintenance.md +- partitioning.md +- performance.md +- reliability.md +- schemas.md + - Spark: +- spark-getting-started.md +- spark-configuration.md +- spark-ddl.md +- spark-procedures.md +- spark-queries.md +- spark-structured-streaming.md +- spark-writes.md + - Flink: +- flink.md +- flink-connector.md +- flink-ddl.md +- flink-queries.md +- flink-writes.md +- flink-actions.md +- flink-configuration.md + - hive.md + - Trino: https://trino.io/docs/current/connector/iceberg.html + - Clickhouse: https://clickhouse.com/docs/en/engines/table-engines/integrations/iceberg + - Presto: https://prestodb.io/docs/current/connector/iceberg.html + - Dremio: https://docs.dremio.com/data-formats/apache-iceberg/ + - Starrocks: https://docs.starrocks.io/en-us/latest/data_source/catalog/iceberg_catalog + - Amazon Athena: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html + - Amazon EMR: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-cluster.html + - Impala: https://impala.apache.org/docs/build/html/topics/impala_iceberg.html + - Doris: https://doris.apache.org/docs/dev/lakehouse/multi-catalog/iceberg + - Integrations: +- aws.md +- dell.md +- jdbc.md +- nessie.md + - API: +- java-api-quickstart.md +- api.md +- custom-catalog.md + - Javadoc: ../../javadoc/1.4.2 Review Comment: ditto to above reply. ## 1.4.3/mkdocs.yml: ## @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +site_name: docs/1.4.2 Review Comment: yes, I still need to wrap up this automation for the actual docs deployment sequence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] [Bug Fix] TruncateTransform for falsey values [iceberg-python]
syun64 opened a new pull request, #276: URL: https://github.com/apache/iceberg-python/pull/276 Currently, any falsey values will return None for their **TruncateTransform**. This results in **fill_parquet_file_metadata** throwing an exception whenever there is a falsey lower bound as the minimum value for the column statistic, as None cannot be encoded into "UTF-8" and fails for the function **to_bytes**. The falsey values added in the test below are valid minimum column statistic values, and hence we should adjust the function to correctly transform and support them. This fix is going to be crucial as we prepare to introduce Write Support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
rdblue commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1456546379 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { +val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) +qe.assertAnalyzed() +val analyzedPlan = qe.analyzed + +val identifier = Spark3Util.toV1TableIdentifier(ident) + +if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } +} + +val queryColumnNames = analyzedPlan.schema.fieldNames +SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) Review Comment: If there are aliases, we could succeed though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Added error handling and default logic for Flink version detection [iceberg]
gjacoby126 commented on code in PR #9452: URL: https://github.com/apache/iceberg/pull/9452#discussion_r1456547407 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java: ## @@ -19,15 +19,31 @@ package org.apache.iceberg.flink.util; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { - /** Choose {@link DataStream} class because it is one of the core Flink API. */ - private static final String VERSION = DataStream.class.getPackage().getImplementationVersion(); + + public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { -return VERSION; +try { + String version = getVersionFromJar(); + /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), then the best we can do is say it's + unknown + */ + return version != null ? version : FLINK_UNKNOWN_VERSION; Review Comment: Uploaded a 1-method version using `AtomicReference` similar to what @nastra and @pvary were suggesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: implement range partitioner for map data statistics [iceberg]
stevenzwu commented on code in PR #9321: URL: https://github.com/apache/iceberg/pull/9321#discussion_r1456527386 ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Pair; + +/** + * Internal partitioner implementation that supports MapDataStatistics, which is typically used for + * low-cardinality use cases. While MapDataStatistics can keep accurate counters, it can't be used + * for high-cardinality use cases. Otherwise, the memory footprint is too high. + */ +class MapRangePartitioner implements Partitioner { + private final RowDataWrapper rowDataWrapper; + private final SortKey sortKey; + private final Comparator comparator; + private final Map mapStatistics; + private final double closeFileCostInWeightPercentage; + + // lazily computed due to the need of numPartitions + private Map assignment; + private NavigableMap sortedStatsWithCloseFileCost; + + MapRangePartitioner( + Schema schema, + SortOrder sortOrder, + MapDataStatistics dataStatistics, + double closeFileCostInWeightPercentage) { +this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); +this.sortKey = new SortKey(schema, sortOrder); +this.comparator = SortOrderComparators.forSchema(schema, sortOrder); +this.mapStatistics = dataStatistics.statistics(); +this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage; + } + + @Override + public int partition(RowData row, int numPartitions) { +// assignment table can only be built lazily when first referenced here, +// because number of partitions (downstream subtasks) is needed +Map assignmentMap = assignment(numPartitions); +// reuse the sortKey and rowDataWrapper +sortKey.wrap(rowDataWrapper.wrap(row)); +KeyAssignment keyAssignment = assignmentMap.get(sortKey); +if (keyAssignment == null) { + // haven't learned about the key before. fall back to random selection. + return ThreadLocalRandom.current().nextInt(numPartitions); Review Comment: I will go with round robin `counter % N` then. it might be a tiny bit faster than `ThreadLocalRandom.current.nextInt(N)`. Ideally, we want to increment a counter metric in this scenario. I looked in the partitioner interface and how it is used. I couldn't find a way of passing in the `MetricGroup` here. for now, we can probably log sth -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Support creating views via SQL [iceberg]
rdblue commented on code in PR #9423: URL: https://github.com/apache/iceberg/pull/9423#discussion_r1456586630 ## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { +val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed +val identifier = Spark3Util.toV1TableIdentifier(ident) + +if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { +throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } +} + +val queryColumnNames = analyzedPlan.schema.fieldNames +SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + +val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema +val columnAliases = userSpecifiedColumns.map(_._1).toArray +val columnComments = userSpecifiedColumns.map(_._2.orNull).toArray + +val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name +val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null +val currentNamespace = session.sessionState.catalogManager.currentNamespace + +val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION +val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) + + (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion, +ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + +if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { +catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + catalog.createView( +ident, +originalText, +currentCatalog, +currentNamespace, +viewSchema, +queryColumnNames, +columnAliases, +columnComments, +newProperties.asJava) +} else { + try { +// CREATE VIEW [IF NOT EXISTS] +catalog.createView( + ident, + originalText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } catch { +case _: ViewAlreadyExistsException if allowExisting => // Ignore + } +} + +Nil + } + + override def simpleString(maxFields: Int): String = { +s"CreateV2ViewExec: ${ident}" + } + + /** +
Re: [PR] Fix community link [iceberg]
amogh-jahagirdar merged PR #9500: URL: https://github.com/apache/iceberg/pull/9500 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] [Reference PR] [API + Avro] Add default value APIs and Avro implementation [iceberg]
wmoustafa opened a new pull request, #9502: URL: https://github.com/apache/iceberg/pull/9502 This PR adds default value APIs according to the default value spec, and implements it in the `GenericAvroReader` case. It uses a `ConstantReader` to fill in the default values of fields from their respective `initialDefault()` method. This PR rebases the implementation in #6004 on top of #9366. However, this PR is just a reference PR for now since there is still an open question about the best place to convert from Iceberg's data model to the Avro data model. For now, I have an Avro-specific method in `ValueReaders` but that is not the correct place. I think a better place is in `GenericAvroReader` where an Avro-specific class extends `ValueReaders.ConstantReader`; however, `ValueReaders.ConstantReader` is currently private. Creating this PR to start a discussion about the best place for constant (in-memory) data conversion. I have marked such places (where a future change is required) as TODO items. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org