Re: [I] Iceberg Materialized View Spec [iceberg]
JanKaul commented on issue #6420: URL: https://github.com/apache/iceberg/issues/6420#issuecomment-1776639925 I've updated the issue description and the google doc (https://docs.google.com/document/d/1UnhldHhe3Grz8JBngwXPA6ZZord1xMedY5ukEhZYF-A/edit?usp=sharing). I would love to get your feedback. @liurenjie1024 @jackye1995 @wmoustafa @rdblue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Iceberg Materialized View Spec [iceberg]
JanKaul commented on issue #6420: URL: https://github.com/apache/iceberg/issues/6420#issuecomment-1776642611 @wmoustafa I hope you are okay with using the table identifier as the storage table pointer instead of using the metadata location. But I don't see a way to use the metadata location with the REST catalog. And ultimately we require a robust solution that works across all catalogs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369754161 ## api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.Set; +import org.apache.iceberg.ContentFile; + +public class ContentFileUtil { Review Comment: Yeah, you are right. Moved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369754595 ## api/src/test/java/org/apache/iceberg/TestHelpers.java: ## @@ -662,6 +663,11 @@ public DataFile copyWithoutStats() { return this; } +@Override +public DataFile copyWithStats(Set statsToKeep) { Review Comment: Fixed everywhere (I hope, will check). ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -174,8 +176,10 @@ public PartitionData copy() { * * @param toCopy a generic data file to copy. * @param fullCopy whether to copy all fields or to drop column-level stats + * @param statsToKeep a set of column ids to keep stats. If empty or null then every + * column stat is kept. */ - BaseFile(BaseFile toCopy, boolean fullCopy) { + BaseFile(BaseFile toCopy, boolean fullCopy, Set statsToKeep) { Review Comment: Fixed everywhere (I hope, will check). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369755601 ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( + Map map, Set columnIds) { +if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); +} + +if (map == null) { Review Comment: I would keep this way, to avoid the extra null check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369756713 ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( Review Comment: I would need to change the `SerializableByteBufferMap` for this, so I kept as it is for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369758919 ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( + Map map, Set columnIds) { +if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); +} + +if (map == null) { + return null; +} + +Map filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); Review Comment: I expect low number of `columnIds` and high number of columns. So I think this is the fastest way to filter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369760407 ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( + Map map, Set columnIds) { +if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); +} + +if (map == null) { + return null; +} + +Map filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); +for (Integer columnId : columnIds) { + TypeT value = map.get(columnId); + if (value != null) { Review Comment: Yes, I found this in some tests, that the map could contain null values -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369760806 ## core/src/main/java/org/apache/iceberg/BaseScan.java: ## @@ -165,6 +169,12 @@ public ThisT includeColumnStats() { return newRefinedScan(table, schema, context.shouldReturnColumnStats(true)); } + @Override + public ThisT includeColumnStats(Set statsNeeded) { Review Comment: Fixed everywhere (I hope, will check). ## core/src/main/java/org/apache/iceberg/ManifestGroup.java: ## @@ -154,6 +156,11 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) { return this; } + ManifestGroup columnStatsToKeep(Set newColumnStatsToKeep) { +this.columnStatsToKeep = newColumnStatsToKeep; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369761206 ## core/src/main/java/org/apache/iceberg/GenericDataFile.java: ## @@ -66,23 +68,30 @@ class GenericDataFile extends BaseFile implements DataFile { * Copy constructor. * * @param toCopy a generic data file to copy. - * @param fullCopy whether to copy all fields or to drop column-level stats + * @param fullCopy whether to copy all fields or to drop column-level stats. + * @param statsToKeep a set of column ids to keep stats. If empty or null then every + * column stat is kept. */ - private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { -super(toCopy, fullCopy); + private GenericDataFile(GenericDataFile toCopy, boolean fullCopy, Set statsToKeep) { Review Comment: Fixed everywhere (I hope, will check). ## core/src/main/java/org/apache/iceberg/GenericDeleteFile.java: ## @@ -67,23 +69,30 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { * Copy constructor. * * @param toCopy a generic data file to copy. - * @param fullCopy whether to copy all fields or to drop column-level stats + * @param fullCopy whether to copy all fields or to drop column-level stats. + * @param statsToKeep a set of column ids to keep stats. If empty or null then every + * column stat is kept. */ - private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) { -super(toCopy, fullCopy); + private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy, Set statsToKeep) { Review Comment: Fixed everywhere (I hope, will check). ## core/src/main/java/org/apache/iceberg/ManifestGroup.java: ## @@ -381,19 +390,22 @@ static class TaskContext { private final DeleteFileIndex deletes; private final ResidualEvaluator residuals; private final boolean dropStats; +private final Set statsToKeep; private final ScanMetrics scanMetrics; TaskContext( PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats, +Set statsToKeep, Review Comment: Fixed everywhere (I hope, will check). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369761574 ## core/src/main/java/org/apache/iceberg/ManifestGroup.java: ## @@ -417,6 +429,10 @@ boolean shouldKeepStats() { return !dropStats; } +Set statsToKeep() { Review Comment: Fixed everywhere (I hope, will check). ## core/src/main/java/org/apache/iceberg/TableScanContext.java: ## @@ -125,6 +132,16 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { .build(); } + TableScanContext columnsToIncludeStats(Set columnStatsToInclude) { +Preconditions.checkState( +returnColumnStats(), +"Cannot select column stats to include when column stats are not returned"); Review Comment: Changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369761988 ## core/src/main/java/org/apache/iceberg/V1Metadata.java: ## @@ -485,6 +486,11 @@ public DataFile copy() { return wrapped.copy(); } +@Override +public DataFile copyWithStats(Set statsToKeep) { Review Comment: Fixed everywhere (I hope, will check). ## core/src/main/java/org/apache/iceberg/V2Metadata.java: ## @@ -560,6 +561,11 @@ public F copy() { throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper"); } +@Override +public F copyWithStats(Set statsToKeep) { Review Comment: Fixed everywhere (I hope, will check). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369762281 ## docs/flink-configuration.md: ## @@ -130,6 +130,7 @@ env.getConfig() | streaming | connector.iceberg.streaming | N/A | false| Sets whether the current task runs in streaming or batch mode. | | monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. | | include-column-stats | connector.iceberg.include-column-stats | N/A | false| Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. | +| column-stats-to-keep | connector.iceberg.column-stats-to-keep | N/A | empty| Create a new scan from this that loads the column stats with each data file for the specified column ids. Column stats include: value count, null value count, lower bounds, and upper bounds. | Review Comment: Removed as per another comment ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java: ## @@ -152,6 +155,16 @@ public boolean includeColumnStats() { .parse(); } + public Set columnStatsToKeep() { Review Comment: Fixed everywhere (I hope, will check). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369762755 ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java: ## @@ -190,4 +203,11 @@ public int maxAllowedPlanningFailures() { .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue()) .parse(); } + + public static Set split(String text) { Review Comment: Removed as per another comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369763072 ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java: ## @@ -96,6 +96,9 @@ private FlinkReadOptions() {} public static final ConfigOption INCLUDE_COLUMN_STATS_OPTION = ConfigOptions.key(PREFIX + INCLUDE_COLUMN_STATS).booleanType().defaultValue(false); + public static final String COLUMN_STATS_TO_KEEP = "column-stats-to-keep"; Review Comment: Removed, as per another comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
pvary commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1369764771 ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java: ## @@ -152,6 +155,16 @@ public boolean includeColumnStats() { .parse(); } + public Set columnStatsToKeep() { +return split( Review Comment: Might need stats for previous versions of columns (old files contain old stats for old columns). Removed for now anyway ## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java: ## @@ -464,6 +476,11 @@ public Builder includeColumnStats(boolean newIncludeColumnStats) { return this; } +public Builder columnStatsToKeep(Set newColumnStatsToKeep) { Review Comment: Fixed everywhere (I hope, will check). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Apache hive 3 with Tez engine select table no empty [iceberg]
pvary commented on issue #8891: URL: https://github.com/apache/iceberg/issues/8891#issuecomment-1776759965 @anvanna: Are you able to read the data from the Iceberg table with another tool? The issue with writing with Tez, that the newly created files are not propagated to the HS2 and they are not added to the commit. So it is absolutely possible that the files are there, and a new Iceberg snapshot is created, but this new snapshot does not contain the new files. I would like to reiterate that the Tez writes are not supported. Could you try using MR for the insert and then read with Tez? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Hive's performance for querying the Iceberg table is very poor. [iceberg]
pvary commented on issue #8901: URL: https://github.com/apache/iceberg/issues/8901#issuecomment-1776773140 You might want to try Hive 4.0.0-beta-1 which has plenty of related performance improvements -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Nessie: reimplement namespace operations [iceberg]
adutra commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1369932165 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -181,133 +185,223 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } public void createNamespace(Namespace namespace, Map metadata) { +getRef().checkMutable(); + +if (namespace.isEmpty()) { + throw new IllegalArgumentException("Creating empty namespaces is not supported"); +} + +ContentKey key = ContentKey.of(namespace.levels()); +org.projectnessie.model.Namespace content = +org.projectnessie.model.Namespace.of(key.getElements(), metadata); + try { - getRef().checkMutable(); - withReference( - getApi() - .createNamespace() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata)) - .create(); - refresh(); -} catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + + Map contentMap = + api.getContent().reference(getReference()).key(key).get(); + Content existing = contentMap.get(key); + if (existing != null) { +throw namespaceAlreadyExists(key, existing, null); + } + + try { + +commitRetry("create namespace " + key, Operation.Put.of(key, content)); + + } catch (NessieReferenceConflictException e) { + +NessieConflictHandler.handleSingle( +e, +(conflictType, contentKey) -> { + switch (conflictType) { +case KEY_EXISTS: + Content conflicting = + withReference(api.getContent()).key(contentKey).get().get(contentKey); + throw namespaceAlreadyExists(contentKey, conflicting, e); +case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + e, + "Cannot create Namespace '%s': parent namespace '%s' does not exist", + namespace, + contentKey); +default: + return false; + } +}); +throw new RuntimeException( +String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage())); + } + } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot create Namespace '%s': " + "ref '%s' is no longer valid.", + "Cannot create Namespace '%s': ref '%s' is no longer valid.", namespace, getRef().getName()), e); +} catch (BaseNessieClientServerException e) { + throw new RuntimeException( + String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage()), e); } } public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { - GetNamespacesResponse response = - withReference( - getApi() - .getMultipleNamespaces() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels( - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .filter(ns -> ns.length() == namespace.length() + 1) + + org.projectnessie.model.Namespace root = + org.projectnessie.model.Namespace.of(namespace.levels()); + + String filter = + namespace.isEmpty() + ? "size(entry.keyElements) == 1" + : String.format( + "size(entry.keyElements) == %d && entry.encodedKey.startsWith('%s.')", + root.getElementCount() + 1, root.name()); + + List entries = + withReference(api.getEntries()).filter(filter).stream() + .filter(e -> Content.Type.NAMESPACE.equals(e.getType())) + .map(EntriesResponse.Entry::getName) + .collect(Collectors.toList()); + + if (entries.isEmpty()) { +return Collections.emptyList(); + } + + GetContentBuilder getContent = withReference(api.getContent()); + entries.forEach(getContent::key); + + return getContent.get().values().stream() + .map(v -> v.unwrap(org.projectnessie.model.Namespace.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(v -> Namespace.of(v.getElements().toArray(new String[0]))) + .filter(v -> v.length() == namespace.length() + 1) // only direct children .collect(Collectors.toList()); -} catch (NessieReferenceNotFoundException e) { + +} catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot list Namespaces
Re: [PR] Nessie: reimplement namespace operations [iceberg]
adutra commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1369938013 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -540,4 +604,35 @@ public void close() { api.close(); } } + + private void commitRetry(String message, Operation... ops) + throws BaseNessieClientServerException { + +CommitMultipleOperationsBuilder commitBuilder = +api.commitMultipleOperations() +.commitMeta(NessieUtil.buildCommitMetadata(message, catalogOptions)) +.operations(Arrays.asList(ops)); + +Tasks.foreach(commitBuilder) +.retry(5) +.stopRetryOn(NessieReferenceNotFoundException.class, NessieReferenceConflictException.class) +.throwFailureWhenFinished() +.onFailure((o, exception) -> refresh()) Review Comment: I rather agree. I also question the usage of `BaseNessieClientServerException` as the upper bound for checked exceptions. It's a good idea to revisit that logic indeed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Add View Support to Spark [iceberg]
nastra commented on issue #7938: URL: https://github.com/apache/iceberg/issues/7938#issuecomment-1777000677 @singhpk234 I was planning to pick up https://github.com/apache/spark/pull/39796, but I don't know yet whether we'd want to integrate those temporarily into Iceberg until they make it into Spark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Add javadoc dir branch [iceberg]
bitsondatadev opened a new pull request, #8912: URL: https://github.com/apache/iceberg/pull/8912 Adding the existing [static javadoc sites](https://github.com/apache/iceberg-docs/tree/asf-site/javadoc) to a separate branch in the main repository. This will enable the [step to add the javadocs at runtime](https://github.com/apache/iceberg/tree/main/site#building-the-versioned-docs), while avoiding having these docs show up in the search index of local repositories. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Fix literal predicate equality check [iceberg-python]
Fokko commented on code in PR #94: URL: https://github.com/apache/iceberg-python/pull/94#discussion_r1370091939 ## pyiceberg/expressions/__init__.py: ## @@ -701,7 +701,7 @@ def bind(self, schema: Schema, case_sensitive: bool = True) -> BoundLiteralPredi def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the LiteralPredicate class.""" -if isinstance(other, LiteralPredicate): +if isinstance(other, self.__class__): Review Comment: Raised a PR https://github.com/danielcweeks/iceberg-python/pull/1 to get these fixed :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Fix literal predicate equality check [iceberg-python]
Fokko merged PR #94: URL: https://github.com/apache/iceberg-python/pull/94 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Replace Thread.Sleep() usage with org.Awaitility from Tests. [iceberg]
nk1506 commented on code in PR #8804: URL: https://github.com/apache/iceberg/pull/8804#discussion_r1370122449 ## core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java: ## @@ -435,13 +437,11 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { for (int numCommittedFiles = 0; numCommittedFiles < numberOfCommitedFilesPerThread; numCommittedFiles++) { -while (barrier.get() < numCommittedFiles * threadsCount) { - try { -Thread.sleep(10); - } catch (InterruptedException e) { -throw new RuntimeException(e); - } -} +final int currentFilesCount = numCommittedFiles; +Awaitility.await() +.pollInterval(Duration.ofMillis(10)) +.pollInSameThread() Review Comment: Only this test was becoming flaky with `Awaitility `. I think adding `pollInSameThread ` for all the similar tests is safe. So I have added `pollInSameThread ` and `atMost ` with 5 minutes to all the other similar tests. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Refurb to ruff [iceberg-python]
jayceslesar commented on PR #87: URL: https://github.com/apache/iceberg-python/pull/87#issuecomment-1777174485 @Fokko I am definitely interested in adding it directly! Can look into this next week -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Refurb to ruff [iceberg-python]
Fokko commented on PR #87: URL: https://github.com/apache/iceberg-python/pull/87#issuecomment-1777259485 @jayceslesar Thanks, there is no rush, appreciate it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Nessie: reimplement namespace operations [iceberg]
dimas-b commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1370331050 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -181,133 +185,223 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } public void createNamespace(Namespace namespace, Map metadata) { +getRef().checkMutable(); + +if (namespace.isEmpty()) { + throw new IllegalArgumentException("Creating empty namespaces is not supported"); +} + +ContentKey key = ContentKey.of(namespace.levels()); +org.projectnessie.model.Namespace content = +org.projectnessie.model.Namespace.of(key.getElements(), metadata); + try { - getRef().checkMutable(); - withReference( - getApi() - .createNamespace() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata)) - .create(); - refresh(); -} catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + + Map contentMap = + api.getContent().reference(getReference()).key(key).get(); + Content existing = contentMap.get(key); + if (existing != null) { +throw namespaceAlreadyExists(key, existing, null); + } + + try { + +commitRetry("create namespace " + key, Operation.Put.of(key, content)); + + } catch (NessieReferenceConflictException e) { + +NessieConflictHandler.handleSingle( +e, +(conflictType, contentKey) -> { + switch (conflictType) { +case KEY_EXISTS: + Content conflicting = + withReference(api.getContent()).key(contentKey).get().get(contentKey); + throw namespaceAlreadyExists(contentKey, conflicting, e); +case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + e, + "Cannot create Namespace '%s': parent namespace '%s' does not exist", + namespace, + contentKey); +default: + return false; + } +}); +throw new RuntimeException( +String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage())); + } + } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot create Namespace '%s': " + "ref '%s' is no longer valid.", + "Cannot create Namespace '%s': ref '%s' is no longer valid.", namespace, getRef().getName()), e); +} catch (BaseNessieClientServerException e) { + throw new RuntimeException( + String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage()), e); } } public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { - GetNamespacesResponse response = - withReference( - getApi() - .getMultipleNamespaces() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels( - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .filter(ns -> ns.length() == namespace.length() + 1) + + org.projectnessie.model.Namespace root = + org.projectnessie.model.Namespace.of(namespace.levels()); + + String filter = + namespace.isEmpty() + ? "size(entry.keyElements) == 1" + : String.format( + "size(entry.keyElements) == %d && entry.encodedKey.startsWith('%s.')", + root.getElementCount() + 1, root.name()); + + List entries = + withReference(api.getEntries()).filter(filter).stream() + .filter(e -> Content.Type.NAMESPACE.equals(e.getType())) + .map(EntriesResponse.Entry::getName) + .collect(Collectors.toList()); + + if (entries.isEmpty()) { +return Collections.emptyList(); + } + + GetContentBuilder getContent = withReference(api.getContent()); + entries.forEach(getContent::key); + + return getContent.get().values().stream() + .map(v -> v.unwrap(org.projectnessie.model.Namespace.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(v -> Namespace.of(v.getElements().toArray(new String[0]))) + .filter(v -> v.length() == namespace.length() + 1) // only direct children .collect(Collectors.toList()); -} catch (NessieReferenceNotFoundException e) { + +} catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot list Namespaces
Re: [PR] Nessie: reimplement namespace operations [iceberg]
dimas-b commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1370336084 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -181,133 +185,223 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } public void createNamespace(Namespace namespace, Map metadata) { +getRef().checkMutable(); + +if (namespace.isEmpty()) { + throw new IllegalArgumentException("Creating empty namespaces is not supported"); +} + +ContentKey key = ContentKey.of(namespace.levels()); +org.projectnessie.model.Namespace content = +org.projectnessie.model.Namespace.of(key.getElements(), metadata); + try { - getRef().checkMutable(); - withReference( - getApi() - .createNamespace() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata)) - .create(); - refresh(); -} catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + + Map contentMap = + api.getContent().reference(getReference()).key(key).get(); + Content existing = contentMap.get(key); + if (existing != null) { +throw namespaceAlreadyExists(key, existing, null); + } + + try { + +commitRetry("create namespace " + key, Operation.Put.of(key, content)); + + } catch (NessieReferenceConflictException e) { + +NessieConflictHandler.handleSingle( +e, +(conflictType, contentKey) -> { + switch (conflictType) { +case KEY_EXISTS: + Content conflicting = + withReference(api.getContent()).key(contentKey).get().get(contentKey); + throw namespaceAlreadyExists(contentKey, conflicting, e); +case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + e, + "Cannot create Namespace '%s': parent namespace '%s' does not exist", + namespace, + contentKey); +default: + return false; + } +}); +throw new RuntimeException( +String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage())); + } + } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot create Namespace '%s': " + "ref '%s' is no longer valid.", + "Cannot create Namespace '%s': ref '%s' is no longer valid.", namespace, getRef().getName()), e); +} catch (BaseNessieClientServerException e) { + throw new RuntimeException( + String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage()), e); } } public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { - GetNamespacesResponse response = - withReference( - getApi() - .getMultipleNamespaces() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels( - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .filter(ns -> ns.length() == namespace.length() + 1) + + org.projectnessie.model.Namespace root = + org.projectnessie.model.Namespace.of(namespace.levels()); + + String filter = + namespace.isEmpty() + ? "size(entry.keyElements) == 1" + : String.format( + "size(entry.keyElements) == %d && entry.encodedKey.startsWith('%s.')", + root.getElementCount() + 1, root.name()); + + List entries = + withReference(api.getEntries()).filter(filter).stream() + .filter(e -> Content.Type.NAMESPACE.equals(e.getType())) + .map(EntriesResponse.Entry::getName) + .collect(Collectors.toList()); + + if (entries.isEmpty()) { +return Collections.emptyList(); + } + + GetContentBuilder getContent = withReference(api.getContent()); + entries.forEach(getContent::key); + + return getContent.get().values().stream() + .map(v -> v.unwrap(org.projectnessie.model.Namespace.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(v -> Namespace.of(v.getElements().toArray(new String[0]))) + .filter(v -> v.length() == namespace.length() + 1) // only direct children .collect(Collectors.toList()); -} catch (NessieReferenceNotFoundException e) { + +} catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot list Namespaces
Re: [I] Exception occurred while writing to Iceberg tables by 'INSERT OVERWRITE' [iceberg]
sanromeo commented on issue #5384: URL: https://github.com/apache/iceberg/issues/5384#issuecomment-1777461016 Have same issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Nessie: reimplement namespace operations [iceberg]
dimas-b commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1370419080 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -181,133 +185,223 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } public void createNamespace(Namespace namespace, Map metadata) { +getRef().checkMutable(); + +if (namespace.isEmpty()) { + throw new IllegalArgumentException("Creating empty namespaces is not supported"); +} + +ContentKey key = ContentKey.of(namespace.levels()); +org.projectnessie.model.Namespace content = +org.projectnessie.model.Namespace.of(key.getElements(), metadata); + try { - getRef().checkMutable(); - withReference( - getApi() - .createNamespace() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata)) - .create(); - refresh(); -} catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + + Map contentMap = + api.getContent().reference(getReference()).key(key).get(); + Content existing = contentMap.get(key); + if (existing != null) { +throw namespaceAlreadyExists(key, existing, null); + } + + try { + +commitRetry("create namespace " + key, Operation.Put.of(key, content)); + + } catch (NessieReferenceConflictException e) { + +NessieConflictHandler.handleSingle( +e, +(conflictType, contentKey) -> { + switch (conflictType) { +case KEY_EXISTS: + Content conflicting = + withReference(api.getContent()).key(contentKey).get().get(contentKey); + throw namespaceAlreadyExists(contentKey, conflicting, e); +case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + e, + "Cannot create Namespace '%s': parent namespace '%s' does not exist", + namespace, + contentKey); +default: + return false; + } +}); +throw new RuntimeException( +String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage())); + } + } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot create Namespace '%s': " + "ref '%s' is no longer valid.", + "Cannot create Namespace '%s': ref '%s' is no longer valid.", namespace, getRef().getName()), e); +} catch (BaseNessieClientServerException e) { + throw new RuntimeException( + String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage()), e); } } public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { - GetNamespacesResponse response = - withReference( - getApi() - .getMultipleNamespaces() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels( - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .filter(ns -> ns.length() == namespace.length() + 1) + + org.projectnessie.model.Namespace root = + org.projectnessie.model.Namespace.of(namespace.levels()); + + String filter = + namespace.isEmpty() + ? "size(entry.keyElements) == 1" + : String.format( + "size(entry.keyElements) == %d && entry.encodedKey.startsWith('%s.')", + root.getElementCount() + 1, root.name()); + + List entries = + withReference(api.getEntries()).filter(filter).stream() + .filter(e -> Content.Type.NAMESPACE.equals(e.getType())) + .map(EntriesResponse.Entry::getName) + .collect(Collectors.toList()); + + if (entries.isEmpty()) { +return Collections.emptyList(); + } + + GetContentBuilder getContent = withReference(api.getContent()); + entries.forEach(getContent::key); + + return getContent.get().values().stream() + .map(v -> v.unwrap(org.projectnessie.model.Namespace.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(v -> Namespace.of(v.getElements().toArray(new String[0]))) + .filter(v -> v.length() == namespace.length() + 1) // only direct children .collect(Collectors.toList()); -} catch (NessieReferenceNotFoundException e) { + +} catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot list Namespaces
Re: [PR] API: add StructTransform base class for PartitionKey and SortKey. add SortOrderComparators [iceberg]
RussellSpitzer commented on code in PR #7798: URL: https://github.com/apache/iceberg/pull/7798#discussion_r1370435653 ## api/src/main/java/org/apache/iceberg/SortKey.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * A struct of flattened sort field values. + * + * Instances of this class can produce sort values from a row passed to {@link + * #wrap(StructLike)}. + */ +public class SortKey extends StructTransform { + private final Schema schema; + private final SortOrder sortOrder; + + public SortKey(Schema schema, SortOrder sortOrder) { +super(schema, fieldTransform(sortOrder)); +this.schema = schema; +this.sortOrder = sortOrder; + } + + private SortKey(SortKey toCopy) { +// only need deep copy inside StructTransform +super(toCopy); +this.schema = toCopy.schema; +this.sortOrder = toCopy.sortOrder; + } + + public SortKey copy() { Review Comment: This is the old method but we aren't actually copying here. Schema and Sort Order are not copies, just noting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] API: add StructTransform base class for PartitionKey and SortKey. add SortOrderComparators [iceberg]
RussellSpitzer commented on code in PR #7798: URL: https://github.com/apache/iceberg/pull/7798#discussion_r1370454116 ## api/src/main/java/org/apache/iceberg/StructTransform.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.util.SerializableFunction; + +/** + * A struct of flattened transformed values. + * + * Instances of this class can produce transformed values from a row passed to {@link + * #wrap(StructLike)}. + */ +class StructTransform implements StructLike, Serializable { + + private final int size; + private final Accessor[] accessors; + + @SuppressWarnings("rawtypes") + private final SerializableFunction[] transforms; + + private final Object[] transformedTuple; + + StructTransform(Schema schema, List fieldTransforms) { +Preconditions.checkArgument(fieldTransforms != null, "Invalid field transform list: null"); + +this.size = fieldTransforms.size(); +this.accessors = (Accessor[]) Array.newInstance(Accessor.class, size); +this.transforms = new SerializableFunction[size]; + +for (int i = 0; i < size; ++i) { + int sourceFieldId = fieldTransforms.get(i).sourceFieldId(); + Transform transform = fieldTransforms.get(i).transform(); + Accessor accessor = schema.accessorForField(sourceFieldId); + Preconditions.checkArgument( + accessor != null, "Cannot build accessor for field: " + schema.findField(sourceFieldId)); Review Comment: minor nit: checkArgs has a "condition, string, args" version so you don't have to do a string concat -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] Schema issue between Arrow and PyIceberg [iceberg]
asheeshgarg opened a new issue, #8913: URL: https://github.com/apache/iceberg/issues/8913 ### Apache Iceberg version 1.4.1 (latest release) ### Query engine Other ### Please describe the bug š @Fokko we have a table in iceberg which has some of the column names begin with numbers. We are able to scan the table using PyIceberg. When try to bind it to Arrow or DuckDB we see its Arrow invalid FieldRef.Name no match for field. What we observe in in Arrow the field name beginning with number like 2030_ABC is renamed to _2030_ABC while the schema on iceberg is correct to define it as 2030_ABC which is in original data. Which trigger this issue. Seem more of Arrow Bug happy to open it at Arrow project. Let me know -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] API: add StructTransform base class for PartitionKey and SortKey. add SortOrderComparators [iceberg]
RussellSpitzer commented on code in PR #7798: URL: https://github.com/apache/iceberg/pull/7798#discussion_r1370461928 ## api/src/main/java/org/apache/iceberg/StructTransform.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.util.SerializableFunction; + +/** + * A struct of flattened transformed values. + * + * Instances of this class can produce transformed values from a row passed to {@link + * #wrap(StructLike)}. + */ +class StructTransform implements StructLike, Serializable { + + private final int size; + private final Accessor[] accessors; + + @SuppressWarnings("rawtypes") + private final SerializableFunction[] transforms; + + private final Object[] transformedTuple; + + StructTransform(Schema schema, List fieldTransforms) { +Preconditions.checkArgument(fieldTransforms != null, "Invalid field transform list: null"); + +this.size = fieldTransforms.size(); +this.accessors = (Accessor[]) Array.newInstance(Accessor.class, size); +this.transforms = new SerializableFunction[size]; + +for (int i = 0; i < size; ++i) { + int sourceFieldId = fieldTransforms.get(i).sourceFieldId(); + Transform transform = fieldTransforms.get(i).transform(); + Accessor accessor = schema.accessorForField(sourceFieldId); + Preconditions.checkArgument( + accessor != null, "Cannot build accessor for field: " + schema.findField(sourceFieldId)); + this.accessors[i] = accessor; + this.transforms[i] = transform.bind(accessor.type()); +} + +this.transformedTuple = new Object[size]; + } + + StructTransform(StructTransform toCopy) { +this.size = toCopy.size; +this.accessors = toCopy.accessors; +this.transforms = toCopy.transforms; + +this.transformedTuple = new Object[size]; +System.arraycopy(toCopy.transformedTuple, 0, this.transformedTuple, 0, size); + } + + public void wrap(StructLike row) { +for (int i = 0; i < transformedTuple.length; i += 1) { + Function transform = transforms[i]; + transformedTuple[i] = transform.apply(accessors[i].get(row)); +} + } + + @Override + public int size() { +return size; + } + + @Override + public T get(int pos, Class javaClass) { +return javaClass.cast(transformedTuple[pos]); + } + + @Override + public void set(int pos, T value) { +transformedTuple[pos] = value; + } + + @Override + public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = 0; i < transformedTuple.length; i += 1) { + if (i > 0) { +sb.append(", "); + } + sb.append(transformedTuple[i]); +} +sb.append("]"); +return sb.toString(); + } + + @Override + public boolean equals(Object o) { +if (this == o) { + return true; +} else if (!(o instanceof StructTransform)) { + return false; +} + +StructTransform that = (StructTransform) o; +return Arrays.equals(transformedTuple, that.transformedTuple); + } + + @Override + public int hashCode() { +return Arrays.hashCode(transformedTuple); + } + + /** + * Simple POJO for source field id and transform function. {@code Pair} class is not usable here + * in API module, as it has Avro dep and lives in core module. Review Comment: as it has a Avro dep and is in the core module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: Enable column statistics filtering after planning [iceberg]
stevenzwu commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1370442734 ## api/src/main/java/org/apache/iceberg/ContentFile.java: ## @@ -165,6 +166,20 @@ default Long fileSequenceNumber() { */ F copyWithoutStats(); + /** + * Copies this file with only specific column stats. Manifest readers can reuse file instances; + * use this method to copy data and only copy specific stats when collecting files. If the + * columnsToKeepStats set is empty or null, then all column stats will be kept. + * + * @param columnsToKeepStats the set of the column ids for the columns which stats are kept. + * @return a copy of this data file, with stats lower bounds, upper bounds, value counts, null + * value counts, and nan value counts for only specific columns. + */ + default F copyWithStats(Set columnsToKeepStats) { +throw new UnsupportedOperationException( +this.getClass().getName() + " doesn't implement copyWithSpecificStats"); Review Comment: copyWithSpecificStats needs to be updated to `copyWithStats(Set)` ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -174,8 +177,10 @@ public PartitionData copy() { * * @param toCopy a generic data file to copy. * @param fullCopy whether to copy all fields or to drop column-level stats + * @param columnsToKeepStats a set of column ids to keep stats. If empty or null then + * every column stat is kept. */ - BaseFile(BaseFile toCopy, boolean fullCopy) { + BaseFile(BaseFile toCopy, boolean fullCopy, Set columnsToKeepStats) { Review Comment: the arg name `fullCopy` is not accurate anymore. maybe rename it to `copyStats`? ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( + Map map, Set columnIds) { +if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); +} + +if (map == null) { + return null; +} + +Map filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); Review Comment: I meant `columnIds.stream().map(columnId -> map.get(columnId))...` ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( Review Comment: got it. this is good then ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( + Map map, Set columnIds) { +if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); +} + +if (map == null) { Review Comment: ok. I am fine with it. It is either check the map or the columnIds first. not sure about extra null check. ## api/src/main/java/org/apache/iceberg/ContentFile.java: ## @@ -165,6 +166,20 @@ default Long fileSequenceNumber() { */ F copyWithoutStats(); + /** + * Copies this file with only specific column stats. Manifest readers can reuse file instances; + * use this method to copy data and only copy specific stats when collecting files. If the Review Comment: should `If the columnsToKeepStats set is empty ...` be put in the `@param` below? ## core/src/main/java/org/apache/iceberg/BaseFile.java: ## @@ -504,6 +508,27 @@ private static Map toReadableByteBufferMap(Map Map filterColumnsStats( + Map map, Set columnIds) { +if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); +} + +if (map == null) { + return null; +} + +Map filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); +for (Integer columnId : columnIds) { + TypeT value = map.get(columnId); + if (value != null) { Review Comment: If there are null values in lower_bound or upper_bound, why do we want to filter them out? shouldn't we keep the same behavior when copying stats for selected columns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] operations fail after upgrading to spark 3.4 [iceberg]
huaxingao commented on issue #8904: URL: https://github.com/apache/iceberg/issues/8904#issuecomment-1777589163 cc @aokolnychyi @RussellSpitzer Seems we need to turn this `useCommitCoordinator` off. Here is the [discussion](https://github.com/apache/spark/pull/36564#issuecomment-1774214202). Do you have any concern of switching this off? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] API: add StructTransform base class for PartitionKey and SortKey. add SortOrderComparators [iceberg]
stevenzwu commented on code in PR #7798: URL: https://github.com/apache/iceberg/pull/7798#discussion_r1370524541 ## api/src/main/java/org/apache/iceberg/StructTransform.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.util.SerializableFunction; + +/** + * A struct of flattened transformed values. + * + * Instances of this class can produce transformed values from a row passed to {@link + * #wrap(StructLike)}. + */ +class StructTransform implements StructLike, Serializable { + + private final int size; + private final Accessor[] accessors; + + @SuppressWarnings("rawtypes") + private final SerializableFunction[] transforms; + + private final Object[] transformedTuple; + + StructTransform(Schema schema, List fieldTransforms) { +Preconditions.checkArgument(fieldTransforms != null, "Invalid field transform list: null"); + +this.size = fieldTransforms.size(); +this.accessors = (Accessor[]) Array.newInstance(Accessor.class, size); +this.transforms = new SerializableFunction[size]; + +for (int i = 0; i < size; ++i) { + int sourceFieldId = fieldTransforms.get(i).sourceFieldId(); + Transform transform = fieldTransforms.get(i).transform(); + Accessor accessor = schema.accessorForField(sourceFieldId); + Preconditions.checkArgument( + accessor != null, "Cannot build accessor for field: " + schema.findField(sourceFieldId)); + this.accessors[i] = accessor; + this.transforms[i] = transform.bind(accessor.type()); +} + +this.transformedTuple = new Object[size]; + } + + StructTransform(StructTransform toCopy) { +this.size = toCopy.size; +this.accessors = toCopy.accessors; +this.transforms = toCopy.transforms; + +this.transformedTuple = new Object[size]; +System.arraycopy(toCopy.transformedTuple, 0, this.transformedTuple, 0, size); + } + + public void wrap(StructLike row) { +for (int i = 0; i < transformedTuple.length; i += 1) { + Function transform = transforms[i]; + transformedTuple[i] = transform.apply(accessors[i].get(row)); +} + } + + @Override + public int size() { +return size; + } + + @Override + public T get(int pos, Class javaClass) { +return javaClass.cast(transformedTuple[pos]); + } + + @Override + public void set(int pos, T value) { +transformedTuple[pos] = value; + } + + @Override + public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = 0; i < transformedTuple.length; i += 1) { + if (i > 0) { +sb.append(", "); + } + sb.append(transformedTuple[i]); +} +sb.append("]"); +return sb.toString(); + } + + @Override + public boolean equals(Object o) { +if (this == o) { + return true; +} else if (!(o instanceof StructTransform)) { + return false; +} + +StructTransform that = (StructTransform) o; +return Arrays.equals(transformedTuple, that.transformedTuple); + } + + @Override + public int hashCode() { +return Arrays.hashCode(transformedTuple); + } + + /** + * Simple POJO for source field id and transform function. {@code Pair} class is not usable here + * in API module, as it has Avro dep and lives in core module. Review Comment: thx. will update to `as it has an Avro dep and is in the core module` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.o
Re: [PR] API: add StructTransform base class for PartitionKey and SortKey. add SortOrderComparators [iceberg]
stevenzwu commented on code in PR #7798: URL: https://github.com/apache/iceberg/pull/7798#discussion_r1370531354 ## api/src/main/java/org/apache/iceberg/SortKey.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * A struct of flattened sort field values. + * + * Instances of this class can produce sort values from a row passed to {@link + * #wrap(StructLike)}. + */ +public class SortKey extends StructTransform { + private final Schema schema; + private final SortOrder sortOrder; + + public SortKey(Schema schema, SortOrder sortOrder) { +super(schema, fieldTransform(sortOrder)); +this.schema = schema; +this.sortOrder = sortOrder; + } + + private SortKey(SortKey toCopy) { +// only need deep copy inside StructTransform +super(toCopy); +this.schema = toCopy.schema; +this.sortOrder = toCopy.sortOrder; + } + + public SortKey copy() { Review Comment: You are right it is not deep copy of everything. but it is a still `copy/clone` implementation of the `SortKey` that we deemed correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Nessie: reimplement namespace operations [iceberg]
adutra commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1370572856 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -181,133 +185,223 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } public void createNamespace(Namespace namespace, Map metadata) { +getRef().checkMutable(); + +if (namespace.isEmpty()) { + throw new IllegalArgumentException("Creating empty namespaces is not supported"); +} + +ContentKey key = ContentKey.of(namespace.levels()); +org.projectnessie.model.Namespace content = +org.projectnessie.model.Namespace.of(key.getElements(), metadata); + try { - getRef().checkMutable(); - withReference( - getApi() - .createNamespace() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata)) - .create(); - refresh(); -} catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + + Map contentMap = + api.getContent().reference(getReference()).key(key).get(); + Content existing = contentMap.get(key); + if (existing != null) { +throw namespaceAlreadyExists(key, existing, null); + } + + try { + +commitRetry("create namespace " + key, Operation.Put.of(key, content)); + + } catch (NessieReferenceConflictException e) { + +NessieConflictHandler.handleSingle( +e, +(conflictType, contentKey) -> { + switch (conflictType) { +case KEY_EXISTS: + Content conflicting = + withReference(api.getContent()).key(contentKey).get().get(contentKey); + throw namespaceAlreadyExists(contentKey, conflicting, e); +case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + e, + "Cannot create Namespace '%s': parent namespace '%s' does not exist", + namespace, + contentKey); +default: + return false; + } +}); +throw new RuntimeException( +String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage())); + } + } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot create Namespace '%s': " + "ref '%s' is no longer valid.", + "Cannot create Namespace '%s': ref '%s' is no longer valid.", namespace, getRef().getName()), e); +} catch (BaseNessieClientServerException e) { + throw new RuntimeException( + String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage()), e); } } public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { - GetNamespacesResponse response = - withReference( - getApi() - .getMultipleNamespaces() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels( - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .filter(ns -> ns.length() == namespace.length() + 1) + + org.projectnessie.model.Namespace root = + org.projectnessie.model.Namespace.of(namespace.levels()); + + String filter = + namespace.isEmpty() + ? "size(entry.keyElements) == 1" + : String.format( + "size(entry.keyElements) == %d && entry.encodedKey.startsWith('%s.')", + root.getElementCount() + 1, root.name()); + + List entries = + withReference(api.getEntries()).filter(filter).stream() + .filter(e -> Content.Type.NAMESPACE.equals(e.getType())) + .map(EntriesResponse.Entry::getName) + .collect(Collectors.toList()); + + if (entries.isEmpty()) { +return Collections.emptyList(); + } + + GetContentBuilder getContent = withReference(api.getContent()); + entries.forEach(getContent::key); + + return getContent.get().values().stream() + .map(v -> v.unwrap(org.projectnessie.model.Namespace.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(v -> Namespace.of(v.getElements().toArray(new String[0]))) + .filter(v -> v.length() == namespace.length() + 1) // only direct children .collect(Collectors.toList()); -} catch (NessieReferenceNotFoundException e) { + +} catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot list Namespaces
Re: [PR] Nessie: reimplement namespace operations [iceberg]
dimas-b commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1370594082 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -181,133 +185,223 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } public void createNamespace(Namespace namespace, Map metadata) { +getRef().checkMutable(); + +if (namespace.isEmpty()) { + throw new IllegalArgumentException("Creating empty namespaces is not supported"); +} + +ContentKey key = ContentKey.of(namespace.levels()); +org.projectnessie.model.Namespace content = +org.projectnessie.model.Namespace.of(key.getElements(), metadata); + try { - getRef().checkMutable(); - withReference( - getApi() - .createNamespace() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata)) - .create(); - refresh(); -} catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + + Map contentMap = + api.getContent().reference(getReference()).key(key).get(); + Content existing = contentMap.get(key); + if (existing != null) { +throw namespaceAlreadyExists(key, existing, null); + } + + try { + +commitRetry("create namespace " + key, Operation.Put.of(key, content)); + + } catch (NessieReferenceConflictException e) { + +NessieConflictHandler.handleSingle( +e, +(conflictType, contentKey) -> { + switch (conflictType) { +case KEY_EXISTS: + Content conflicting = + withReference(api.getContent()).key(contentKey).get().get(contentKey); + throw namespaceAlreadyExists(contentKey, conflicting, e); +case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + e, + "Cannot create Namespace '%s': parent namespace '%s' does not exist", + namespace, + contentKey); +default: + return false; + } +}); +throw new RuntimeException( +String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage())); + } + } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot create Namespace '%s': " + "ref '%s' is no longer valid.", + "Cannot create Namespace '%s': ref '%s' is no longer valid.", namespace, getRef().getName()), e); +} catch (BaseNessieClientServerException e) { + throw new RuntimeException( + String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage()), e); } } public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { - GetNamespacesResponse response = - withReference( - getApi() - .getMultipleNamespaces() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels( - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .filter(ns -> ns.length() == namespace.length() + 1) + + org.projectnessie.model.Namespace root = + org.projectnessie.model.Namespace.of(namespace.levels()); + + String filter = + namespace.isEmpty() + ? "size(entry.keyElements) == 1" + : String.format( + "size(entry.keyElements) == %d && entry.encodedKey.startsWith('%s.')", + root.getElementCount() + 1, root.name()); + + List entries = + withReference(api.getEntries()).filter(filter).stream() + .filter(e -> Content.Type.NAMESPACE.equals(e.getType())) + .map(EntriesResponse.Entry::getName) + .collect(Collectors.toList()); + + if (entries.isEmpty()) { +return Collections.emptyList(); + } + + GetContentBuilder getContent = withReference(api.getContent()); + entries.forEach(getContent::key); + + return getContent.get().values().stream() + .map(v -> v.unwrap(org.projectnessie.model.Namespace.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(v -> Namespace.of(v.getElements().toArray(new String[0]))) + .filter(v -> v.length() == namespace.length() + 1) // only direct children .collect(Collectors.toList()); -} catch (NessieReferenceNotFoundException e) { + +} catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot list Namespaces
Re: [I] Schema issue between Arrow and PyIceberg [iceberg]
Fokko commented on issue #8913: URL: https://github.com/apache/iceberg/issues/8913#issuecomment-164930 Thanks @asheeshgarg for raising this. The PyIceberg repository has been moved to https://github.com/apache/iceberg-python. To get the quickest answers, it is best to raise the question over there. We recently merged a PR that fixes this when reading Avro fields: https://github.com/apache/iceberg-python/pull/83 Can you try if this issue still persists on the main branch? You can easily install it using: ```sh pip install "git+https://github.com/apache/iceberg-python.git#egg=pyiceberg[arrow]"; ``` If you run into anything, could you also share the error? Thanks š -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Nessie: reimplement namespace operations [iceberg]
adutra commented on code in PR #8857: URL: https://github.com/apache/iceberg/pull/8857#discussion_r1370653922 ## nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java: ## @@ -181,133 +185,223 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } public void createNamespace(Namespace namespace, Map metadata) { +getRef().checkMutable(); + +if (namespace.isEmpty()) { + throw new IllegalArgumentException("Creating empty namespaces is not supported"); +} + +ContentKey key = ContentKey.of(namespace.levels()); +org.projectnessie.model.Namespace content = +org.projectnessie.model.Namespace.of(key.getElements(), metadata); + try { - getRef().checkMutable(); - withReference( - getApi() - .createNamespace() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata)) - .create(); - refresh(); -} catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + + Map contentMap = + api.getContent().reference(getReference()).key(key).get(); + Content existing = contentMap.get(key); + if (existing != null) { +throw namespaceAlreadyExists(key, existing, null); + } + + try { + +commitRetry("create namespace " + key, Operation.Put.of(key, content)); + + } catch (NessieReferenceConflictException e) { + +NessieConflictHandler.handleSingle( +e, +(conflictType, contentKey) -> { + switch (conflictType) { +case KEY_EXISTS: + Content conflicting = + withReference(api.getContent()).key(contentKey).get().get(contentKey); + throw namespaceAlreadyExists(contentKey, conflicting, e); +case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + e, + "Cannot create Namespace '%s': parent namespace '%s' does not exist", + namespace, + contentKey); +default: + return false; + } +}); +throw new RuntimeException( +String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage())); + } + } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot create Namespace '%s': " + "ref '%s' is no longer valid.", + "Cannot create Namespace '%s': ref '%s' is no longer valid.", namespace, getRef().getName()), e); +} catch (BaseNessieClientServerException e) { + throw new RuntimeException( + String.format("Cannot create Namespace '%s': %s", namespace, e.getMessage()), e); } } public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { - GetNamespacesResponse response = - withReference( - getApi() - .getMultipleNamespaces() - .namespace(org.projectnessie.model.Namespace.of(namespace.levels( - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .filter(ns -> ns.length() == namespace.length() + 1) + + org.projectnessie.model.Namespace root = + org.projectnessie.model.Namespace.of(namespace.levels()); + + String filter = + namespace.isEmpty() + ? "size(entry.keyElements) == 1" + : String.format( + "size(entry.keyElements) == %d && entry.encodedKey.startsWith('%s.')", + root.getElementCount() + 1, root.name()); + + List entries = + withReference(api.getEntries()).filter(filter).stream() + .filter(e -> Content.Type.NAMESPACE.equals(e.getType())) + .map(EntriesResponse.Entry::getName) + .collect(Collectors.toList()); + + if (entries.isEmpty()) { +return Collections.emptyList(); + } + + GetContentBuilder getContent = withReference(api.getContent()); + entries.forEach(getContent::key); + + return getContent.get().values().stream() + .map(v -> v.unwrap(org.projectnessie.model.Namespace.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(v -> Namespace.of(v.getElements().toArray(new String[0]))) + .filter(v -> v.length() == namespace.length() + 1) // only direct children .collect(Collectors.toList()); -} catch (NessieReferenceNotFoundException e) { + +} catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Cannot list Namespaces
Re: [I] Adding new columns (mergeSchema) [iceberg]
RussellSpitzer commented on issue #8908: URL: https://github.com/apache/iceberg/issues/8908#issuecomment-1777826448 https://iceberg.apache.org/docs/latest/spark-writes/#schema-merge Please make sure you are setting all the appropriate properties -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spec: add nanosecond timestamp types [iceberg]
jacobmarble commented on PR #8683: URL: https://github.com/apache/iceberg/pull/8683#issuecomment-1777840610 @rdblue @Fokko what concerns remain regarding this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Open-API: Error response in the spec donāt align with the expected model. [iceberg]
geruh commented on code in PR #8914: URL: https://github.com/apache/iceberg/pull/8914#discussion_r1370927351 ## open-api/rest-catalog-open-api.yaml: ## @@ -2508,19 +2517,6 @@ components: } } -IcebergErrorResponse: Review Comment: Didn't mean to delete it but it was unused can add back in a revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Open-API: Error response in the spec donāt align with the expected model. [iceberg]
geruh commented on code in PR #8914: URL: https://github.com/apache/iceberg/pull/8914#discussion_r1370927351 ## open-api/rest-catalog-open-api.yaml: ## @@ -2508,19 +2517,6 @@ components: } } -IcebergErrorResponse: Review Comment: Didn't mean to remove this but it was unused can add back in a revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Open-API: Error response in the spec donāt align with the expected model. [iceberg]
geruh commented on code in PR #8914: URL: https://github.com/apache/iceberg/pull/8914#discussion_r1370927351 ## open-api/rest-catalog-open-api.yaml: ## @@ -2508,19 +2517,6 @@ components: } } -IcebergErrorResponse: Review Comment: Didn't mean to remove this however, it was unused can add back in a revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] java.lang.IllegalArgumentException: requirement failed while read migrated parquet table [iceberg]
Omega359 commented on issue #8863: URL: https://github.com/apache/iceberg/issues/8863#issuecomment-1778246891 I've just encountered this exception as well but the circumstances are somewhat different. One process is writing out an iceberg table primarily via appends with the occasional delete. Upon completion a new cluster is spun up that reads that table. This issue presented itself during that process: ``` 23/10/24 22:25:51 ERROR BaseReader: Error reading file(s): s3://vdcint-transaction-dev-txn/AR_IDw/dev/transaction_2023_10_ndleq8xl/source_alias=MC3/transaction_date_year=2016/00081-10437993-40ecbe40-af70-41a4-9fde-61bc6a6abeb2-1.parquet java.lang.IllegalArgumentException: requirement failed: length (-135733377) cannot be smaller than -1 at scala.Predef$.require(Predef.scala:281) ~[scala-library-2.12.15.jar:?] at org.apache.spark.rdd.InputFileBlockHolder$.set(InputFileBlockHolder.scala:79) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rdd.InputFileBlockHolder.set(InputFileBlockHolder.scala) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:89) ~[app.jar:?] at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41) ~[app.jar:?] at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141) ~[app.jar:?] at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.15.jar:?] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.columnartorow_nextBatch_0$(Unknown Source) ~[?:?] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithoutKey_0$(Unknown Source) ~[?:?] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) ~[?:?] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hasNext(Unknown Source) ~[?:?] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?] at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.ru
Re: [I] Prohibit rewrites of equality deletes across sequence numbers [iceberg]
github-actions[bot] commented on issue #7452: URL: https://github.com/apache/iceberg/issues/7452#issuecomment-1778259989 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Move iceberg table data from one bucket to another using spark [iceberg]
github-actions[bot] commented on issue #7446: URL: https://github.com/apache/iceberg/issues/7446#issuecomment-1778260018 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Improve error reporting when streaming snapshot ID is no longer available [iceberg]
github-actions[bot] commented on issue #7340: URL: https://github.com/apache/iceberg/issues/7340#issuecomment-1778260107 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Improve error reporting when streaming snapshot ID is no longer available [iceberg]
github-actions[bot] closed issue #7340: Improve error reporting when streaming snapshot ID is no longer available URL: https://github.com/apache/iceberg/issues/7340 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] How to improve performance of RewriteManifests procedure? [iceberg]
github-actions[bot] commented on issue #7325: URL: https://github.com/apache/iceberg/issues/7325#issuecomment-1778260143 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] How to improve performance of RewriteManifests procedure? [iceberg]
github-actions[bot] closed issue #7325: How to improve performance of RewriteManifests procedure? URL: https://github.com/apache/iceberg/issues/7325 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Open-API: Error response in the spec donāt align with the expected model. [iceberg]
jackye1995 commented on code in PR #8914: URL: https://github.com/apache/iceberg/pull/8914#discussion_r1370970627 ## open-api/rest-catalog-open-api.yaml: ## @@ -2508,19 +2517,6 @@ components: } } -IcebergErrorResponse: Review Comment: I think the response here is for the `responses` section, versus the other one is for the `schemas` section, so we cannot remove this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Open-API: Error response in the spec donāt align with the expected model. [iceberg]
jackye1995 commented on PR #8914: URL: https://github.com/apache/iceberg/pull/8914#issuecomment-1778282237 cc @amogh-jahagirdar @nastra seems like a miss in the spec, unless the OpenAPI error response requires the `error` nesting by default, but I did not find any reference of that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] Flink: Not Writing [iceberg]
a8356555 opened a new issue, #8916: URL: https://github.com/apache/iceberg/issues/8916 ### Apache Iceberg version 1.4.0 ### Query engine Flink ### Please describe the bug š I'm using following Dockerfile as my environment: ```Dockerfile FROM alpine:3.17.0 AS builder # Download required jars WORKDIR /tmp/download/my-jars RUN wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.4.0/iceberg-flink-runtime-1.16-1.4.0.jar RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.263/bundle-2.17.263.jar RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.263/url-connection-client-2.17.263.jar FROM apache/flink:1.16.2-scala_2.12-java8 AS runtime # Install Python 3.8 & git & PyFlink RUN apt-get update && apt-get install -y software-properties-common && \ add-apt-repository -y ppa:deadsnakes/ppa && \ apt-get remove -y software-properties-common && apt-get autoremove -y && apt-get clean RUN apt-get update && apt-get install -y python3.8 python3-pip python3.8-distutils git && apt-get clean RUN python3.8 -m pip install --upgrade pip RUN python3.8 -m pip install apache-flink==1.16.2 --no-cache-dir Install Hadoop & export Hadoop classpath WORKDIR /tmp/download/my-hadoop RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz && \ tar xzf hadoop-3.3.4.tar.gz && \ mv hadoop-3.3.4 /opt/hadoop-3.3.4 && \ rm hadoop-3.3.4.tar.gz ENV HADOOP_HOME=/opt/hadoop-3.3.4 ENV HADOOP_CLASSPATH=/opt/hadoop-3.3.4/etc/hadoop:/opt/hadoop-3.3.4/share/hadoop/common/lib/*:/opt/hadoop-3.3.4/share/hadoop/common/*:/opt/hadoop-3.3.4/share/hadoop/hdfs:/opt/hadoop-3.3.4/share/hadoop/hdfs/lib/*:/opt/hadoop-3.3.4/share/hadoop/hdfs/*:/opt/hadoop-3.3.4/share/hadoop/mapreduce/*:/opt/hadoop-3.3.4/share/hadoop/yarn:/opt/hadoop-3.3.4/share/hadoop/yarn/lib/*:/opt/hadoop-3.3.4/share/hadoop/yarn/* Copy jars from builder stage COPY --from=builder /tmp/download/my-jars/. /opt/flink/lib/. ``` Here is my pyflink code (job.py) ```python import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.common import Types ICEBERG_GLUE_WAREHOUSE = os.environ['ICEBERG_GLUE_WAREHOUSE'] ICEBERG_GLUE_DATABASE_NAME_SRC = os.environ['ICEBERG_GLUE_DATABASE_NAME_SRC'] ICEBERG_GLUE_DATABASE_NAME_DST = os.environ['ICEBERG_GLUE_DATABASE_NAME_DST'] ICEBERG_GLUE_TABLE_NAME_SRC = os.environ['ICEBERG_GLUE_TABLE_NAME_SRC'] ICEBERG_GLUE_TABLE_NAME_DST = os.environ['ICEBERG_GLUE_TABLE_NAME_DST'] env = StreamExecutionEnvironment.get_execution_environment() env.disable_operator_chaining() env.set_parallelism(1) env.enable_checkpointing(6) # 60s checkpoint_config = env.get_checkpoint_config() checkpoint_config.set_checkpoint_storage_dir('file:///tmp/checkpoint') stenv = StreamTableEnvironment.create(env) stenv.get_config().get_configuration().set_string('table.local-time-zone', 'UTC') stenv.execute_sql(f''' CREATE TEMPORARY TABLE `mytable` ( `t` TIMESTAMP, `table` STRING, `op`STRING, `before`MAP, `after` MAP, `_kc_tx`TIMESTAMP, `_kc_source`MAP, `_kafka`ROW<`topic` STRING, `partition` INT, `offset` BIGINT, `timestamp` TIMESTAMP> ) WITH ( 'connector' = 'iceberg', 'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}', 'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', 'catalog-name' = 'mycatalog', 'catalog-database' = '{ICEBERG_GLUE_DATABASE_NAME_SRC}', 'catalog-table' = '{ICEBERG_GLUE_TABLE_NAME_SRC}' ); ''') stenv.execute_sql(f''' CREATE CATALOG `mycatalog` WITH ( 'type' = 'iceberg', 'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', 'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}' ); ''') stenv.execute_sql(f''' CREATE TABLE IF NOT EXISTS `mycatalog`.`{ICEBERG_GLUE_DATABASE_NAME_DST}`.`{ICEBERG_GLUE_TABLE_NAME_DST}` ( `t` TIMESTAMP, `table` STRING, `op` STRING, PRIMARY KEY (`table`) NOT ENFORCED ) PARTITIONED BY (`table`) WITH ( 'format-version'='2', 'write.upsert.enabled'='true' ); ''') type_info_datom = Types.ROW_NA
[PR] Add docs dir branch [iceberg]
bitsondatadev opened a new pull request, #8917: URL: https://github.com/apache/iceberg/pull/8917 Adding 1.4.0/1.4.1 versioned MkDocs builds ([they both use the same version](https://github.com/apache/iceberg-docs/tree/asf-site/docs)) to an orphaned `docs` branch in the main repository. This will enable the [step to add the docs at build time](https://github.com/apache/iceberg/tree/main/site#building-the-versioned-docs), while avoiding having these docs show up in the search index of local repositories. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add Snapshot logic and Summary generation [iceberg-python]
HonahX commented on code in PR #61: URL: https://github.com/apache/iceberg-python/pull/61#discussion_r1369660433 ## pyiceberg/table/snapshots.py: ## @@ -116,3 +144,199 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: +added_size: int +removed_size: int +added_files: int +removed_files: int +added_eq_delete_files: int +removed_eq_delete_files: int +added_pos_delete_files: int +removed_pos_delete_files: int +added_delete_files: int +removed_delete_files: int +added_records: int +deleted_records: int +added_pos_deletes: int +removed_pos_deletes: int +added_eq_deletes: int +removed_eq_deletes: int + +def __init__(self) -> None: +self.added_size = 0 +self.removed_size = 0 +self.added_files = 0 +self.removed_files = 0 +self.added_eq_delete_files = 0 +self.removed_eq_delete_files = 0 +self.added_pos_delete_files = 0 +self.removed_pos_delete_files = 0 +self.added_delete_files = 0 +self.removed_delete_files = 0 +self.added_records = 0 +self.deleted_records = 0 +self.added_pos_deletes = 0 +self.removed_pos_deletes = 0 +self.added_eq_deletes = 0 +self.removed_eq_deletes = 0 + +def add_file(self, data_file: DataFile) -> None: +if data_file.content == DataFileContent.DATA: +self.added_files += 1 +self.added_records += data_file.record_count +self.added_size += data_file.file_size_in_bytes Review Comment: Do we want to increment the `added_size` when handling POSITION_DELETES and EQUALITY_DELETES? ## pyiceberg/table/snapshots.py: ## @@ -116,3 +144,199 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: +added_size: int +removed_size: int +added_files: int +removed_files: int +added_eq_delete_files: int +removed_eq_delete_files: int +added_pos_delete_files: int +removed_pos_delete_files: int +added_delete_files: int +removed_delete_files: int +added_records: int +deleted_records: int +added_pos_deletes: int +removed_pos_deletes: int +added_eq_deletes: int +removed_eq_deletes: int + Review Comment: In Java Implementation we have a flag named `trustSizeAndDeleteCounts`, which is set to false when we add a `DELETES` manifest. Based on my understanding, the purpose of this flag is to let us skip reporting size and delete counts related metrics when we add one or more `DELETES` manifest since we do not know the exact number of rows deleted in the manifest. Do we want to add the flag in this PR or in the future? ref: https://github.com/apache/iceberg/pull/1367#discussion_r500492383 ## pyiceberg/table/snapshots.py: ## @@ -116,3 +144,199 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: +added_size: int +removed_size: int +added_files: int +removed_files: int +added_eq_delete_files: int +removed_eq_delete_files: int +added_pos_delete_files: int +removed_pos_delete_files: int +added_delete_files: int +removed_delete_files: int +added_records: int +deleted_records: int +added_pos_deletes: int +removed_pos_deletes: int +added_eq_deletes: int +removed_eq_deletes: int + +def __init__(self) -> None: +self.added_size = 0 +self.removed_size = 0 +self.added_files = 0 +self.removed_files = 0 +self.added_eq_delete_files = 0 +self.removed_eq_delete_files = 0 +self.added_pos_delete_files = 0 +self.removed_pos_delete_files = 0 +self.added_delete_files = 0 +self.removed_delete_files = 0 +self.added_records = 0 +self.deleted_records = 0 +self.added_pos_deletes = 0 +self.removed_pos_deletes = 0 +self.added_eq_deletes = 0 +self.removed_eq_deletes = 0 + +def add_file(self, data_file: DataFile) -> None: +if data_file.content == DataFileContent.DATA: +self.added_files += 1 +self.added_records += data_file.record_count +self.added_size += data_file.file_size_in_bytes +elif data_file.content == DataFileContent.POSITION_DELETES: +self.added_delete_files += 1 +self.added_pos_delete_files += 1 +self.added_pos_deletes += data_file.record_count +elif dat
Re: [PR] Add Snapshot logic and Summary generation [iceberg-python]
HonahX commented on code in PR #61: URL: https://github.com/apache/iceberg-python/pull/61#discussion_r137662 ## pyiceberg/table/snapshots.py: ## @@ -116,3 +144,199 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: +added_size: int +removed_size: int +added_files: int +removed_files: int +added_eq_delete_files: int +removed_eq_delete_files: int +added_pos_delete_files: int +removed_pos_delete_files: int +added_delete_files: int +removed_delete_files: int +added_records: int +deleted_records: int +added_pos_deletes: int +removed_pos_deletes: int +added_eq_deletes: int +removed_eq_deletes: int + Review Comment: In Java Implementation I saw a flag named `trustSizeAndDeleteCounts`, which is set to false when we add a `DELETES` manifest. Based on my understanding, the purpose of this flag is to let us skip reporting size and delete counts related metrics when we add one or more `DELETES` manifest since we do not know the exact number of rows deleted in the manifest. Do we want to add the flag in this PR or in the future? ref: https://github.com/apache/iceberg/pull/1367#discussion_r500492383 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org