flyrain commented on code in PR #1303:
URL: https://github.com/apache/polaris/pull/1303#discussion_r2038958051


##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -42,42 +48,118 @@
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg 
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes 
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the 
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
 public class SparkCatalog
     implements StagingTableCatalog,
         TableCatalog,
         SupportsNamespaces,
         ViewCatalog,
         SupportsReplaceView {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
 
-  private static final Set<String> DEFAULT_NS_KEYS = 
ImmutableSet.of(TableCatalog.PROP_OWNER);
-  private String catalogName = null;
-  private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
-
-  // TODO: Add Polaris Specific REST Catalog
+  @VisibleForTesting protected String catalogName = null;
+  @VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog 
icebergsSparkCatalog = null;
+  @VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
+  @VisibleForTesting protected DeltaHelper deltaHelper = null;
 
   @Override
   public String name() {
     return catalogName;
   }
 
+  /**
+   * Check whether invalid catalog configuration is provided, and return an 
option map with catalog
+   * type configured correctly. This function mainly validates two parts: 1) 
No customized catalog
+   * implementation is provided. 2) No non-rest catalog type is configured.
+   */
+  private CaseInsensitiveStringMap validateAndResolveCatalogOptions(
+      CaseInsensitiveStringMap options) {
+    String catalogType =
+        PropertyUtil.propertyAsString(
+            options, CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+    if (catalogType != null && 
!catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST)) {
+      throw new IllegalStateException(
+          "Only rest catalog type is allowed, but got catalog type: "
+              + catalogType
+              + ". Either configure the type to rest or remove the config");
+    }
+
+    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      throw new IllegalStateException(
+          "Customized catalog implementation is not supported and not needed, 
please remove the configuration!");
+    }

Review Comment:
   Minor: using `Precondition.checkArgument()`



##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import com.esotericsoftware.minlog.Log;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.polaris.spark.PolarisSparkCatalog;
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeltaHelper {
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaHelper.class);
+
+  public static final String DELTA_CATALOG_IMPL_KEY = "delta-catalog-impl";
+  private static final String DEFAULT_DELTA_CATALOG_CLASS =
+      "org.apache.spark.sql.delta.catalog.DeltaCatalog";
+
+  private TableCatalog deltaCatalog = null;
+  private String deltaCatalogImpl = DEFAULT_DELTA_CATALOG_CLASS;
+
+  public DeltaHelper(CaseInsensitiveStringMap options) {
+    if (options.get(DELTA_CATALOG_IMPL_KEY) != null) {
+      this.deltaCatalogImpl = options.get(DELTA_CATALOG_IMPL_KEY);
+    }
+  }
+
+  public TableCatalog loadDeltaCatalog(PolarisSparkCatalog 
polarisSparkCatalog) {
+    if (this.deltaCatalog != null) {
+      return this.deltaCatalog;
+    }
+
+    DynConstructors.Ctor<TableCatalog> ctor;
+    try {
+      ctor = 
DynConstructors.builder(TableCatalog.class).impl(deltaCatalogImpl).buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize Delta Catalog %s: %s", 
deltaCatalogImpl, e.getMessage()),
+          e);
+    }
+
+    try {
+      this.deltaCatalog = ctor.newInstance();
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot initialize Delta Catalog, %s does not implement Table 
Catalog.",
+              deltaCatalogImpl),
+          e);
+    }
+
+    // set the polaris spark catalog as the delegate catalog of delta catalog
+    ((DelegatingCatalogExtension) 
this.deltaCatalog).setDelegateCatalog(polarisSparkCatalog);
+
+    // We want to behave exactly the same as unity catalog for Delta. However, 
DeltaCatalog
+    // implementation today is hard coded for unity catalog. Following issue 
is used to track
+    // the extension of the usage 
https://github.com/delta-io/delta/issues/4306.
+    // Here, we use reflection to set the isUnityCatalog to true for exactly 
same behavior as
+    // unity catalog for now.
+    try {
+      // isUnityCatalog is a lazy val, access the compute method for the lazy 
val
+      // make sure the method is triggered before the value is set, otherwise, 
the
+      // value will be overwritten later when the method is triggered.
+      String methodGetName = "isUnityCatalog" + "$lzycompute";
+      Method method = 
this.deltaCatalog.getClass().getDeclaredMethod(methodGetName);
+      method.setAccessible(true);
+      // invoke the lazy methods before it is set
+      method.invoke(this.deltaCatalog);
+    } catch (NoSuchMethodException e) {
+      Log.warn("No lazy compute method found for variable isUnityCatalog");

Review Comment:
   ```suggestion
         LOG.warn("No lazy compute method found for variable isUnityCatalog");
   ```



##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -42,42 +48,118 @@
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg 
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes 
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the 
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
 public class SparkCatalog
     implements StagingTableCatalog,
         TableCatalog,
         SupportsNamespaces,
         ViewCatalog,
         SupportsReplaceView {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
 
-  private static final Set<String> DEFAULT_NS_KEYS = 
ImmutableSet.of(TableCatalog.PROP_OWNER);
-  private String catalogName = null;
-  private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
-
-  // TODO: Add Polaris Specific REST Catalog
+  @VisibleForTesting protected String catalogName = null;
+  @VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog 
icebergsSparkCatalog = null;
+  @VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
+  @VisibleForTesting protected DeltaHelper deltaHelper = null;
 
   @Override
   public String name() {
     return catalogName;
   }
 
+  /**
+   * Check whether invalid catalog configuration is provided, and return an 
option map with catalog
+   * type configured correctly. This function mainly validates two parts: 1) 
No customized catalog
+   * implementation is provided. 2) No non-rest catalog type is configured.
+   */
+  private CaseInsensitiveStringMap validateAndResolveCatalogOptions(
+      CaseInsensitiveStringMap options) {
+    String catalogType =
+        PropertyUtil.propertyAsString(
+            options, CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+    if (catalogType != null && 
!catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST)) {
+      throw new IllegalStateException(
+          "Only rest catalog type is allowed, but got catalog type: "
+              + catalogType
+              + ". Either configure the type to rest or remove the config");
+    }
+
+    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      throw new IllegalStateException(
+          "Customized catalog implementation is not supported and not needed, 
please remove the configuration!");
+    }
+
+    Map<String, String> resolvedOptions = Maps.newHashMap();
+    resolvedOptions.putAll(options);
+    if (catalogType == null) {
+      // if no catalog type is provided, set the catalog type to rest to 
ensure iceberg
+      // spark Catalog can be started correctly.
+      resolvedOptions.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+    }
+
+    return new CaseInsensitiveStringMap(resolvedOptions);
+  }
+
+  /**
+   * Initialize REST Catalog for Iceberg and Polaris, this is the only catalog 
type supported by
+   * Polaris at this moment.
+   */
+  private void initRESTCatalog(String name, CaseInsensitiveStringMap options) {
+    CaseInsensitiveStringMap resolvedOptions = 
validateAndResolveCatalogOptions(options);
+
+    // initialize the icebergSparkCatalog
+    this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
+    this.icebergsSparkCatalog.initialize(name, resolvedOptions);
+
+    // initialize the polaris spark catalog
+    OAuth2Util.AuthSession catalogAuth =
+        PolarisCatalogUtils.getAuthSession(this.icebergsSparkCatalog);

Review Comment:
   Nit: using `var` to make them in one line.



##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -42,42 +48,118 @@
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg 
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes 
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the 
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
 public class SparkCatalog
     implements StagingTableCatalog,
         TableCatalog,
         SupportsNamespaces,
         ViewCatalog,
         SupportsReplaceView {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
 
-  private static final Set<String> DEFAULT_NS_KEYS = 
ImmutableSet.of(TableCatalog.PROP_OWNER);
-  private String catalogName = null;
-  private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
-
-  // TODO: Add Polaris Specific REST Catalog
+  @VisibleForTesting protected String catalogName = null;
+  @VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog 
icebergsSparkCatalog = null;
+  @VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
+  @VisibleForTesting protected DeltaHelper deltaHelper = null;
 
   @Override
   public String name() {
     return catalogName;
   }
 
+  /**
+   * Check whether invalid catalog configuration is provided, and return an 
option map with catalog
+   * type configured correctly. This function mainly validates two parts: 1) 
No customized catalog
+   * implementation is provided. 2) No non-rest catalog type is configured.
+   */
+  private CaseInsensitiveStringMap validateAndResolveCatalogOptions(
+      CaseInsensitiveStringMap options) {
+    String catalogType =
+        PropertyUtil.propertyAsString(
+            options, CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+    if (catalogType != null && 
!catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST)) {
+      throw new IllegalStateException(
+          "Only rest catalog type is allowed, but got catalog type: "
+              + catalogType
+              + ". Either configure the type to rest or remove the config");
+    }

Review Comment:
   Minor: using Precondition
   ```suggestion
    Preconditions.checkArgument(catalogType == null || 
catalogType.equals(ICEBERG_CATALOG_TYPE_REST), "Only rest catalog type is 
allowed, but got catalog type: "
                 + catalogType
                 + ". Either configure the type to rest or remove the config");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to