ajantha-bhat commented on code in PR #9487:
URL: https://github.com/apache/iceberg/pull/9487#discussion_r1455179512

##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##########
@@ -53,21 +58,19 @@ final class JdbcUtil {
           + " WHERE "
           + CATALOG_NAME
           + " = ? AND "
-          + TABLE_NAMESPACE
-          + " = ? AND "
-          + TABLE_NAME
-          + " = ? AND "
+          + " %s = ? AND " // table or view namespace

Review Comment:
   nit: we can remove the extra space before %s



##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##########
@@ -303,6 +331,263 @@ public static Properties 
filterAndRemovePrefix(Map<String, String> properties, S
     return result;
   }
 
+  /**
+   * Create SQL statement to get a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String getTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        GET_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to update a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DO_COMMIT_TABLE_OR_VIEW_SQL, tableOrViewTableName, 
tableOrViewNamespace, tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String createCatalogTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        CREATE_CATALOG_TABLE_OR_VIEW,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to list tables or views.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String listTablesOrViewsSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    return String.format(LIST_TABLES_OR_VIEWS_SQL, tableOrViewTableName, 
tableOrViewNamespace);
+  }
+
+  /**
+   * Create SQL statement to rename a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String renameTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        RENAME_TABLE_OR_VIEW_SQL,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to delete a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String dropTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DROP_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitCreateTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DO_COMMIT_CREATE_TABLE_OR_VIEW_SQL,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Update a table or view.
+   *
+   * @param table true to update a table, false to update a view.
+   * @param connections the {@link JdbcClientPool} to connect to the database.
+   * @param catalogName the catalog name.
+   * @param tableOrViewIdentifier the table or view identifier.
+   * @param newMetadataLocation the new metadata location.
+   * @param oldMetadataLocation the old metadata location.
+   * @return the number of updated record.
+   * @throws SQLException if a SQL error happens on the database.
+   * @throws InterruptedException interrupt the SQL query execution.
+   */
+  public static int updateTableOrView(
+      boolean table,
+      JdbcClientPool connections,
+      String catalogName,
+      TableIdentifier tableOrViewIdentifier,
+      String newMetadataLocation,
+      String oldMetadataLocation)
+      throws SQLException, InterruptedException {
+    return connections.run(
+        conn -> {
+          try (PreparedStatement sql = 
conn.prepareStatement(doCommitSqlStatement(table))) {
+            // UPDATE
+            sql.setString(1, newMetadataLocation);
+            sql.setString(2, oldMetadataLocation);
+            // WHERE
+            sql.setString(3, catalogName);
+            sql.setString(4, 
namespaceToString(tableOrViewIdentifier.namespace()));
+            sql.setString(5, tableOrViewIdentifier.name());
+            sql.setString(6, oldMetadataLocation);
+            return sql.executeUpdate();
+          }
+        });
+  }
+
+  /**
+   * Get a table or view.
+   *
+   * @param table true of get a table, false to get a view.
+   * @param connections the {@link JdbcClientPool} to connect to the database.
+   * @param catalogName the catalog name.
+   * @param tableOrViewIdentifier the table or view identifier.
+   * @return the table or view.
+   * @throws SQLException if a SQL error happens on the database.
+   * @throws InterruptedException interrupt the SQL query execution.
+   */
+  public static Map<String, String> getTableOrView(
+      boolean table,
+      JdbcClientPool connections,
+      String catalogName,
+      TableIdentifier tableOrViewIdentifier)
+      throws SQLException, InterruptedException {
+    return connections.run(
+        conn -> {
+          Map<String, String> tableOrView = Maps.newHashMap();
+
+          String getTableOrViewSqlStatement = 
JdbcUtil.getTableOrViewSqlStatement(table);
+          try (PreparedStatement sql = 
conn.prepareStatement(getTableOrViewSqlStatement)) {
+            sql.setString(1, catalogName);
+            sql.setString(2, 
namespaceToString(tableOrViewIdentifier.namespace()));
+            sql.setString(3, tableOrViewIdentifier.name());
+            ResultSet rs = sql.executeQuery();
+
+            if (rs.next()) {
+              tableOrView.put(JdbcUtil.CATALOG_NAME, 
rs.getString(JdbcUtil.CATALOG_NAME));
+              tableOrView.put(
+                  (table ? JdbcUtil.TABLE_NAMESPACE : JdbcUtil.VIEW_NAMESPACE),
+                  rs.getString((table ? JdbcUtil.TABLE_NAMESPACE : 
JdbcUtil.VIEW_NAMESPACE)));
+              tableOrView.put(
+                  (table ? JdbcUtil.TABLE_NAME : JdbcUtil.VIEW_NAME),
+                  rs.getString((table ? JdbcUtil.TABLE_NAME : 
JdbcUtil.VIEW_NAME)));
+              tableOrView.put(
+                  BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+                  
rs.getString(BaseMetastoreTableOperations.METADATA_LOCATION_PROP));
+              tableOrView.put(
+                  BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP,
+                  
rs.getString(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP));
+            }
+
+            rs.close();
+          }
+
+          return tableOrView;
+        });
+  }
+
+  /**
+   * Insert a table or view in the catalog.
+   *
+   * @param table true to insert a table, false to insert a view.
+   * @param connections the {@link JdbcClientPool} to connect to the database.
+   * @param catalogName the catalog name.
+   * @param namespace the namespace.
+   * @param tableOrViewIdentifier the table or view identifier.
+   * @param newMetadataLocation the new metadata location.
+   * @return the number of updated records.
+   * @throws SQLException if a SQL error happens on the database.
+   * @throws InterruptedException interrupt the SQL query execution.
+   */
+  public static int doCommitCreateTableOrView(
+      boolean table,
+      JdbcClientPool connections,
+      String catalogName,
+      Namespace namespace,
+      TableIdentifier tableOrViewIdentifier,
+      String newMetadataLocation)
+      throws SQLException, InterruptedException {
+    return connections.run(
+        conn -> {
+          try (PreparedStatement sql =
+              
conn.prepareStatement(doCommitCreateTableOrViewSqlStatement(table))) {
+            sql.setString(1, catalogName);
+            sql.setString(2, namespaceToString(namespace));
+            sql.setString(3, tableOrViewIdentifier.name());
+            sql.setString(4, newMetadataLocation);
+            return sql.executeUpdate();
+          }
+        });
+  }
+
+  static boolean viewExists(
+      String catalogName, JdbcClientPool connections, TableIdentifier 
viewIdentifier) {
+    if (exists(
+        connections,
+        JdbcUtil.getTableOrViewSqlStatement(false),
+        catalogName,
+        namespaceToString(viewIdentifier.namespace()),
+        viewIdentifier.name())) {
+      return true;
+    }
+    return false;

Review Comment:
   nit: Iceberg follows a codestyle of having an empty line after the block 
(If, for loop etc)



##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##########
@@ -303,6 +331,263 @@ public static Properties 
filterAndRemovePrefix(Map<String, String> properties, S
     return result;
   }
 
+  /**
+   * Create SQL statement to get a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String getTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        GET_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to update a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DO_COMMIT_TABLE_OR_VIEW_SQL, tableOrViewTableName, 
tableOrViewNamespace, tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String createCatalogTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        CREATE_CATALOG_TABLE_OR_VIEW,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to list tables or views.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String listTablesOrViewsSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    return String.format(LIST_TABLES_OR_VIEWS_SQL, tableOrViewTableName, 
tableOrViewNamespace);
+  }
+
+  /**
+   * Create SQL statement to rename a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String renameTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        RENAME_TABLE_OR_VIEW_SQL,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to delete a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String dropTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DROP_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitCreateTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DO_COMMIT_CREATE_TABLE_OR_VIEW_SQL,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Update a table or view.
+   *
+   * @param table true to update a table, false to update a view.
+   * @param connections the {@link JdbcClientPool} to connect to the database.
+   * @param catalogName the catalog name.
+   * @param tableOrViewIdentifier the table or view identifier.
+   * @param newMetadataLocation the new metadata location.
+   * @param oldMetadataLocation the old metadata location.
+   * @return the number of updated record.
+   * @throws SQLException if a SQL error happens on the database.
+   * @throws InterruptedException interrupt the SQL query execution.
+   */
+  public static int updateTableOrView(

Review Comment:
   These public interfaces can also be `updateTable` and `updateView` and 
internally it can call a private method `updateContent` with the boolean flag. 



##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java:
##########
@@ -157,8 +159,10 @@ private void initializeCatalogTables() throws 
InterruptedException, SQLException
             return true;
           }
 
-          LOG.debug("Creating table {} to store iceberg catalog", 
JdbcUtil.CATALOG_TABLE_NAME);
-          return 
conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
+          LOG.debug(
+              "Creating table {} to store iceberg catalog tables", 
JdbcUtil.CATALOG_TABLE_NAME);
+          return 
conn.prepareStatement(JdbcUtil.createCatalogTableOrViewSqlStatement(true))

Review Comment:
   I think the public interfaces can still be `JdbcUtil.createCatalogTableSql` 
and `JdbcUtil.createCatalogViewSql`.  Internally it can still call the common 
SQL by filling required variables.
   
   Boolean passing looks little confusing as we need to read the signature to 
understand that true is for table or view.  



##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##########
@@ -303,6 +331,263 @@ public static Properties 
filterAndRemovePrefix(Map<String, String> properties, S
     return result;
   }
 
+  /**
+   * Create SQL statement to get a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String getTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        GET_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to update a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DO_COMMIT_TABLE_OR_VIEW_SQL, tableOrViewTableName, 
tableOrViewNamespace, tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String createCatalogTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        CREATE_CATALOG_TABLE_OR_VIEW,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to list tables or views.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String listTablesOrViewsSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    return String.format(LIST_TABLES_OR_VIEWS_SQL, tableOrViewTableName, 
tableOrViewNamespace);
+  }
+
+  /**
+   * Create SQL statement to rename a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String renameTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        RENAME_TABLE_OR_VIEW_SQL,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to delete a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String dropTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DROP_TABLE_OR_VIEW_SQL, tableOrViewTableName, tableOrViewNamespace, 
tableOrViewName);
+  }
+
+  /**
+   * Create SQL statement to create a table or view.
+   *
+   * @param table true to get the SQL statement for a table, false for a view.
+   * @return the SQL statement.
+   */
+  public static String doCommitCreateTableOrViewSqlStatement(boolean table) {
+    String tableOrViewTableName = table ? CATALOG_TABLE_NAME : 
CATALOG_VIEW_NAME;
+    String tableOrViewNamespace = table ? TABLE_NAMESPACE : VIEW_NAMESPACE;
+    String tableOrViewName = table ? TABLE_NAME : VIEW_NAME;
+    return String.format(
+        DO_COMMIT_CREATE_TABLE_OR_VIEW_SQL,
+        tableOrViewTableName,
+        tableOrViewNamespace,
+        tableOrViewName);
+  }
+
+  /**
+   * Update a table or view.
+   *
+   * @param table true to update a table, false to update a view.
+   * @param connections the {@link JdbcClientPool} to connect to the database.
+   * @param catalogName the catalog name.
+   * @param tableOrViewIdentifier the table or view identifier.
+   * @param newMetadataLocation the new metadata location.
+   * @param oldMetadataLocation the old metadata location.
+   * @return the number of updated record.
+   * @throws SQLException if a SQL error happens on the database.
+   * @throws InterruptedException interrupt the SQL query execution.
+   */
+  public static int updateTableOrView(

Review Comment:
   similarly for below methods. 



##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.jdbc;
+
+import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** JDBC implementation of Iceberg ViewOperations. */
+public class JdbcViewOperations extends BaseViewOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JdbcViewOperations.class);
+  private final String catalogName;
+  private final TableIdentifier viewIdentifier;
+  private final FileIO fileIO;
+  private final JdbcClientPool connections;
+  private final Map<String, String> catalogProperties;
+
+  protected JdbcViewOperations(
+      JdbcClientPool dbConnPool,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier viewIdentifier,
+      Map<String, String> catalogProperties) {
+    this.catalogName = catalogName;
+    this.viewIdentifier = viewIdentifier;
+    this.fileIO = fileIO;
+    this.connections = dbConnPool;
+    this.catalogProperties = catalogProperties;
+  }
+
+  @Override
+  protected void doRefresh() {
+    Map<String, String> view;
+
+    try {
+      view = JdbcUtil.getTableOrView(false, connections, catalogName, 
viewIdentifier);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e, "Interrupted during refresh");
+    } catch (SQLException e) {
+      // SQL exception happened when getting view from catalog
+      throw new UncheckedSQLException(
+          e, "Failed to get view %s from catalog %s", viewIdentifier, 
catalogName);
+    }
+
+    if (view.isEmpty()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchViewException("View does not exist: %s", 
viewIdentifier);
+      } else {
+        this.disableRefresh();
+        return;
+      }
+    }
+
+    String newMetadataLocation = 
view.get(JdbcTableOperations.METADATA_LOCATION_PROP);
+    Preconditions.checkState(
+        newMetadataLocation != null, "Invalid view %s: metadata location is 
null", viewIdentifier);
+    refreshFromMetadataLocation(newMetadataLocation);
+  }
+
+  @Override
+  protected void doCommit(ViewMetadata base, ViewMetadata metadata) {
+    boolean newView = base == null;
+    String newMetadataLocation = writeNewMetadataIfRequired(metadata);
+    try {
+      Map<String, String> view =
+          JdbcUtil.getTableOrView(false, connections, catalogName, 
viewIdentifier);
+
+      if (base != null) {
+        validateMetadataLocation(view, base);
+        String oldMetadataLocation = base.metadataFileLocation();
+        // Start atomic update
+        LOG.debug("Committing existing view: {}", viewName());
+        updateView(newMetadataLocation, oldMetadataLocation);
+      } else {
+        // table not exists create it

Review Comment:
   ```suggestion
           // view does not exist. Create it.
   ```



##########
core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.jdbc;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.view.ViewCatalogTests;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestJdbcViewCatalog extends ViewCatalogTests<JdbcCatalog> {
+
+  static Configuration conf = new Configuration();
+  private static JdbcCatalog catalog;
+  private static String warehouseLocation;
+
+  @TempDir java.nio.file.Path tableDir;
+
+  @BeforeEach
+  public void before() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(
+        CatalogProperties.URI,
+        "jdbc:sqlite:file::memory:?ic" + 
UUID.randomUUID().toString().replace("-", ""));
+    properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
+    properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
+    warehouseLocation = this.tableDir.toAbsolutePath().toString();
+    properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+
+    catalog = new JdbcCatalog();
+    catalog.setConf(conf);
+    catalog.initialize("testCatalog", properties);
+  }
+
+  @Override
+  protected JdbcCatalog catalog() {
+    return catalog;
+  }
+
+  @Override
+  protected Catalog tableCatalog() {
+    return catalog;
+  }
+
+  @Override
+  protected boolean requiresNamespaceCreate() {

Review Comment:
   I think this can be false (no need an override). Check `TestJdbcCatalog`



##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java:
##########
@@ -139,7 +134,22 @@ final class JdbcUtil {
           + " LIKE ? ESCAPE '\\' "
           + " ) "
           + " LIMIT 1";
-  static final String LIST_NAMESPACES_SQL =
+  static final String GET_VIEW_NAMESPACE_SQL =

Review Comment:
   why are we not having `GET_TABLE_OR_VIEW_NAMESPACE_SQL` here and below 
places?



##########
core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.jdbc;
+
+import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** JDBC implementation of Iceberg ViewOperations. */
+public class JdbcViewOperations extends BaseViewOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JdbcViewOperations.class);
+  private final String catalogName;
+  private final TableIdentifier viewIdentifier;
+  private final FileIO fileIO;
+  private final JdbcClientPool connections;
+  private final Map<String, String> catalogProperties;
+
+  protected JdbcViewOperations(
+      JdbcClientPool dbConnPool,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier viewIdentifier,
+      Map<String, String> catalogProperties) {
+    this.catalogName = catalogName;
+    this.viewIdentifier = viewIdentifier;
+    this.fileIO = fileIO;
+    this.connections = dbConnPool;
+    this.catalogProperties = catalogProperties;
+  }
+
+  @Override
+  protected void doRefresh() {
+    Map<String, String> view;
+
+    try {
+      view = JdbcUtil.getTableOrView(false, connections, catalogName, 
viewIdentifier);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e, "Interrupted during refresh");
+    } catch (SQLException e) {
+      // SQL exception happened when getting view from catalog
+      throw new UncheckedSQLException(
+          e, "Failed to get view %s from catalog %s", viewIdentifier, 
catalogName);
+    }
+
+    if (view.isEmpty()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchViewException("View does not exist: %s", 
viewIdentifier);
+      } else {
+        this.disableRefresh();
+        return;
+      }
+    }
+
+    String newMetadataLocation = 
view.get(JdbcTableOperations.METADATA_LOCATION_PROP);
+    Preconditions.checkState(
+        newMetadataLocation != null, "Invalid view %s: metadata location is 
null", viewIdentifier);
+    refreshFromMetadataLocation(newMetadataLocation);
+  }
+
+  @Override
+  protected void doCommit(ViewMetadata base, ViewMetadata metadata) {
+    boolean newView = base == null;
+    String newMetadataLocation = writeNewMetadataIfRequired(metadata);
+    try {
+      Map<String, String> view =
+          JdbcUtil.getTableOrView(false, connections, catalogName, 
viewIdentifier);
+
+      if (base != null) {
+        validateMetadataLocation(view, base);
+        String oldMetadataLocation = base.metadataFileLocation();
+        // Start atomic update
+        LOG.debug("Committing existing view: {}", viewName());
+        updateView(newMetadataLocation, oldMetadataLocation);
+      } else {
+        // table not exists create it
+        LOG.debug("Committing new view: {}", viewName());
+        createView(newMetadataLocation);
+      }
+
+    } catch (SQLIntegrityConstraintViolationException e) {
+

Review Comment:
   nit: new line here is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to