JonasJ-ap commented on code in PR #6449:
URL: https://github.com/apache/iceberg/pull/6449#discussion_r1082104013


##########
delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.delta;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.VersionLog;
+import io.delta.standalone.actions.Action;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import java.io.File;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Takes a Delta Lake table's location and attempts to create an Iceberg table 
snapshot in an
+ * optional user-specified location (default to the Delta Lake table's 
location) with a different
+ * identifier.
+ */
+class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseSnapshotDeltaLakeTableAction.class);
+
+  private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source";
+  private static final String DELTA_SOURCE_VALUE = "delta";
+  private static final String ORIGINAL_LOCATION_PROP = "original_location";
+  private static final String PARQUET_SUFFIX = ".parquet";
+  private static final String AVRO_SUFFIX = ".avro";
+  private static final String ORC_SUFFIX = ".orc";
+  private final ImmutableMap.Builder<String, String> 
additionalPropertiesBuilder =
+      ImmutableMap.builder();
+  private DeltaLog deltaLog;
+  private Catalog icebergCatalog;
+  private final String deltaTableLocation;
+  private TableIdentifier newTableIdentifier;
+  private String newTableLocation;
+  private HadoopFileIO deltaLakeFileIO;
+
+  /**
+   * Snapshot a delta lake table to be an iceberg table. The action will read 
the delta lake table's
+   * log through the table's path, create a new iceberg table using the given 
icebergCatalog and
+   * newTableIdentifier, and commit all changes in one iceberg transaction.
+   *
+   * <p>The new table will only be created if the snapshot is successful.
+   *
+   * @param deltaTableLocation the delta lake table's path
+   */
+  BaseSnapshotDeltaLakeTableAction(String deltaTableLocation) {
+    this.deltaTableLocation = deltaTableLocation;
+    this.newTableLocation = deltaTableLocation;
+  }
+
+  @Override
+  public SnapshotDeltaLakeTable tableProperties(Map<String, String> 
properties) {
+    additionalPropertiesBuilder.putAll(properties);
+    return this;
+  }
+
+  @Override
+  public SnapshotDeltaLakeTable tableProperty(String name, String value) {
+    additionalPropertiesBuilder.put(name, value);
+    return this;
+  }
+
+  @Override
+  public SnapshotDeltaLakeTable tableLocation(String location) {
+    this.newTableLocation = location;
+    return this;
+  }
+
+  @Override
+  public SnapshotDeltaLakeTable as(TableIdentifier identifier) {
+    this.newTableIdentifier = identifier;
+    return this;
+  }
+
+  @Override
+  public SnapshotDeltaLakeTable icebergCatalog(Catalog catalog) {
+    this.icebergCatalog = catalog;
+    return this;
+  }
+
+  @Override
+  public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) {
+    this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation);
+    this.deltaLakeFileIO = new HadoopFileIO(conf);
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    Preconditions.checkArgument(
+        icebergCatalog != null && newTableIdentifier != null,
+        "Iceberg catalog and identifier cannot be null. Make sure to configure 
the action with a valid Iceberg catalog and identifier.");
+    Preconditions.checkArgument(
+        deltaLog != null && deltaLakeFileIO != null,
+        "Make sure to configure the action with a valid 
deltaLakeConfiguration");
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+    Schema schema = 
convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema());
+    PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema);
+    Transaction icebergTransaction =
+        icebergCatalog.newCreateTableTransaction(
+            newTableIdentifier,
+            schema,
+            partitionSpec,
+            newTableLocation,
+            destTableProperties(updatedSnapshot, deltaTableLocation));
+    icebergTransaction
+        .table()
+        .updateProperties()
+        .set(
+            TableProperties.DEFAULT_NAME_MAPPING,
+            
NameMappingParser.toJson(MappingUtil.create(icebergTransaction.table().schema())))
+        .commit();
+    Iterator<VersionLog> versionLogIterator =

Review Comment:
   Thank you for your suggestions. I would like to discuss about my thought on 
the current design:
   
   In this implementation, my intention is to make all snapshots of the delta 
lake table available in the newly created iceberg table so that users can time 
travel to any snapshot they want after the snapshot/migration. Hence, I 
iterates from the beginning and make each version log as a commit to the 
iceberg table. 
   
   With 
[`DeltaLog.snapshot()`](https://delta-io.github.io/connectors/latest/delta-standalone/api/java/io/delta/standalone/DeltaLog.html#snapshot--),
 we can access the latest checkpoint and build the new iceberg from here. 
However, it would make user lose acess to all snapshots before the last 
checkpoint in the new iceberg table. 
   
   There is another option: With 
[`DeltaLog.update()`](https://delta-io.github.io/connectors/latest/delta-standalone/api/java/io/delta/standalone/DeltaLog.html#update--),
 we can access the latest snapshot (built from the latest checkpoint) and only 
commits the latest snapshot to the new iceberg table. This approach preserve 
the fewest logs but is the fastest.
   
   In conclusion, I think the tradeoff here is between the action time cost and 
the number of logs converted to the new iceberg table. I feel that the later 
two approach can be a user-configurable option (perhaps with a name like 
`shallowCopy`?) so that user can choose not to convert previous snapshots if 
they do not need them. I think this option can be added later. 
   
   Do you think it is ok to keep the current implementation as the initial PR 
for this module? Please correct me if I misunderstand something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to