nastra commented on code in PR #6428:
URL: https://github.com/apache/iceberg/pull/6428#discussion_r1056076021


##########
snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnowflakeCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+  public static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
+  public static final String DEFAULT_FILE_IO_IMPL = 
"org.apache.iceberg.io.ResolvingFileIO";

Review Comment:
   looks like `DEFAULT_CATALOG_NAME` & `DEFAULT_FILE_IO_IMPL` are only used 
inside `SnowflakeCatalog`, thus both could be made private



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SnowflakeTableOperations extends BaseMetastoreTableOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnowflakeTableOperations.class);
+
+  private final FileIO fileIO;
+  private final TableIdentifier tableIdentifier;
+  private final SnowflakeIdentifier snowflakeIdentifierForTable;
+  private final String fullTableName;
+
+  private final SnowflakeClient snowflakeClient;
+
+  protected SnowflakeTableOperations(
+      SnowflakeClient snowflakeClient,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier tableIdentifier) {
+    this.snowflakeClient = snowflakeClient;
+    this.fileIO = fileIO;
+    this.tableIdentifier = tableIdentifier;
+    this.snowflakeIdentifierForTable = 
NamespaceHelpers.toSnowflakeIdentifier(tableIdentifier);
+    this.fullTableName = String.format("%s.%s", catalogName, 
tableIdentifier.toString());

Review Comment:
   ```suggestion
       this.fullTableName = String.format("%s.%s", catalogName, 
tableIdentifier);
   ```



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnowflakeCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+  public static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
+  public static final String DEFAULT_FILE_IO_IMPL = 
"org.apache.iceberg.io.ResolvingFileIO";
+
+  static class FileIOFactory {
+    public FileIO newFileIO(String impl, Map<String, String> properties, 
Object hadoopConf) {
+      return CatalogUtil.loadFileIO(impl, properties, hadoopConf);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnowflakeCatalog.class);
+
+  private CloseableGroup closeableGroup;
+  private Object conf;
+  private String catalogName;
+  private Map<String, String> catalogProperties;
+  private FileIOFactory fileIOFactory;
+  private SnowflakeClient snowflakeClient;
+
+  public SnowflakeCatalog() {}
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    SnowflakeIdentifier scope = 
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    Preconditions.checkArgument(
+        scope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "listTables must be at SCHEMA level; got %s from namespace %s",
+        scope,
+        namespace);
+
+    List<SnowflakeIdentifier> sfTables = 
snowflakeClient.listIcebergTables(scope);
+
+    return sfTables.stream()
+        .map(NamespaceHelpers::toIcebergTableIdentifier)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support dropTable");
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support renameTable");
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    String uri = properties.get(CatalogProperties.URI);
+    Preconditions.checkNotNull(uri, "JDBC connection URI is required");

Review Comment:
   here we do a `Preconditions.checkNotNull(uri, ..)` while in the other 
`initialize()` method we use `Preconditions.checkArgument(null != ...)`. Would 
be great to align this to `Preconditions.checkArgument(..)`



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnowflakeCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+  public static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
+  public static final String DEFAULT_FILE_IO_IMPL = 
"org.apache.iceberg.io.ResolvingFileIO";
+
+  static class FileIOFactory {
+    public FileIO newFileIO(String impl, Map<String, String> properties, 
Object hadoopConf) {
+      return CatalogUtil.loadFileIO(impl, properties, hadoopConf);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnowflakeCatalog.class);
+
+  private CloseableGroup closeableGroup;
+  private Object conf;
+  private String catalogName;
+  private Map<String, String> catalogProperties;
+  private FileIOFactory fileIOFactory;
+  private SnowflakeClient snowflakeClient;
+
+  public SnowflakeCatalog() {}
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    SnowflakeIdentifier scope = 
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    Preconditions.checkArgument(
+        scope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "listTables must be at SCHEMA level; got %s from namespace %s",
+        scope,
+        namespace);
+
+    List<SnowflakeIdentifier> sfTables = 
snowflakeClient.listIcebergTables(scope);
+
+    return sfTables.stream()
+        .map(NamespaceHelpers::toIcebergTableIdentifier)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support dropTable");
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support renameTable");
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    String uri = properties.get(CatalogProperties.URI);
+    Preconditions.checkNotNull(uri, "JDBC connection URI is required");
+    try {
+      // We'll ensure the expected JDBC driver implementation class is 
initialized through
+      // reflection regardless of which classloader ends up using this 
JdbcSnowflakeClient, but
+      // we'll only warn if the expected driver fails to load, since users may 
use repackaged or
+      // custom JDBC drivers for Snowflake communication.
+      Class.forName(JdbcSnowflakeClient.EXPECTED_JDBC_IMPL);
+    } catch (ClassNotFoundException cnfe) {
+      LOG.warn(
+          "Failed to load expected JDBC SnowflakeDriver - if queries fail by 
failing"
+              + " to find a suitable driver for jdbc:snowflake:// URIs, you 
must add the Snowflake "
+              + " JDBC driver to your jars/packages",
+          cnfe);
+    }
+    JdbcClientPool connectionPool = new JdbcClientPool(uri, properties);
+
+    initialize(name, new JdbcSnowflakeClient(connectionPool), new 
FileIOFactory(), properties);
+  }
+
+  /**
+   * Initialize using caller-supplied SnowflakeClient and FileIO.
+   *
+   * @param name The name of the catalog, defaults to "snowflake_catalog"
+   * @param snowflakeClient The client encapsulating network communication 
with Snowflake
+   * @param fileIOFactory The {@link FileIOFactory} to use to instantiate a 
new FileIO for each new
+   *     table operation
+   * @param properties The catalog options to use and propagate to dependencies
+   */
+  @SuppressWarnings("checkstyle:HiddenField")
+  void initialize(
+      String name,
+      SnowflakeClient snowflakeClient,
+      FileIOFactory fileIOFactory,
+      Map<String, String> properties) {
+    Preconditions.checkArgument(null != snowflakeClient, "snowflakeClient must 
be non-null");
+    Preconditions.checkArgument(null != fileIOFactory, "fileIOFactory must be 
non-null");
+    this.catalogName = name == null ? DEFAULT_CATALOG_NAME : name;
+    this.snowflakeClient = snowflakeClient;
+    this.fileIOFactory = fileIOFactory;
+    this.catalogProperties = properties;
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(snowflakeClient);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (null != closeableGroup) {
+      closeableGroup.close();
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support createNamespace");
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) {
+    SnowflakeIdentifier scope = 
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    List<SnowflakeIdentifier> results = null;
+    switch (scope.type()) {
+      case ROOT:
+        results = snowflakeClient.listDatabases();
+        break;
+      case DATABASE:
+        results = snowflakeClient.listSchemas(scope);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "listNamespaces must be at either ROOT or DATABASE level; got 
%s from namespace %s",
+                scope, namespace));
+    }
+
+    List<Namespace> namespaceList =
+        results.stream()
+            .map(
+                result -> {
+                  Preconditions.checkState(
+                      result.type() == SnowflakeIdentifier.Type.SCHEMA
+                          || result.type() == 
SnowflakeIdentifier.Type.DATABASE,
+                      "Got identifier of type %s from listNamespaces for %s",
+                      result.type(),
+                      namespace);
+                  return NamespaceHelpers.toIcebergNamespace(result);
+                })
+            .collect(Collectors.toList());
+    return namespaceList;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace)
+      throws NoSuchNamespaceException {
+    SnowflakeIdentifier id = NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    boolean namespaceExists;
+    switch (id.type()) {
+      case DATABASE:
+        namespaceExists = snowflakeClient.databaseExists(id);
+        break;
+      case SCHEMA:
+        namespaceExists = snowflakeClient.schemaExists(id);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "loadNamespaceMetadat must be at either DATABASE or SCHEMA 
level; got %s from namespace %s",
+                id, namespace));
+    }
+    if (namespaceExists) {
+      return ImmutableMap.of();
+    } else {
+      throw new NoSuchNamespaceException(
+          "Namespace '%s' with snowflake identifier '%s' doesn't exist", 
namespace, id);
+    }
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support dropNamespace");
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> 
properties) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support setProperties");
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) 
{
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support removeProperties");
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    String fileIOImpl = DEFAULT_FILE_IO_IMPL;
+    if (catalogProperties.containsKey(CatalogProperties.FILE_IO_IMPL)) {
+      fileIOImpl = catalogProperties.get(CatalogProperties.FILE_IO_IMPL);
+    }
+    FileIO fileIO = fileIOFactory.newFileIO(fileIOImpl, catalogProperties, 
conf);

Review Comment:
   could you elaborate please why we're initializing FileIO here? Not that it's 
a bad thing to do, but I'd just like to understand the context (maybe a comment 
would help here as well)



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class NamespaceHelpers {
+  private static final int MAX_NAMESPACE_DEPTH = 2;
+  private static final int NAMESPACE_ROOT_LEVEL = 0;
+  private static final int NAMESPACE_DB_LEVEL = 1;
+  private static final int NAMESPACE_SCHEMA_LEVEL = 2;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NamespaceHelpers.class);
+
+  private NamespaceHelpers() {}
+
+  /**
+   * Converts a Namespace into a SnowflakeIdentifier representing ROOT, a 
DATABASE, or a SCHEMA.
+   *
+   * @throws IllegalArgumentException if the namespace is not a supported 
depth.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(Namespace namespace) 
{

Review Comment:
   should there be a `TestNamespaceHelpers` class that makes sure the 
functionality here is tested?



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class NamespaceHelpers {
+  private static final int MAX_NAMESPACE_DEPTH = 2;
+  private static final int NAMESPACE_ROOT_LEVEL = 0;
+  private static final int NAMESPACE_DB_LEVEL = 1;
+  private static final int NAMESPACE_SCHEMA_LEVEL = 2;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NamespaceHelpers.class);
+
+  private NamespaceHelpers() {}
+
+  /**
+   * Converts a Namespace into a SnowflakeIdentifier representing ROOT, a 
DATABASE, or a SCHEMA.
+   *
+   * @throws IllegalArgumentException if the namespace is not a supported 
depth.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(Namespace namespace) 
{
+    SnowflakeIdentifier identifier = null;
+    switch (namespace.length()) {
+      case NAMESPACE_ROOT_LEVEL:
+        identifier = SnowflakeIdentifier.ofRoot();
+        break;
+      case NAMESPACE_DB_LEVEL:
+        identifier = 
SnowflakeIdentifier.ofDatabase(namespace.level(NAMESPACE_DB_LEVEL - 1));
+        break;
+      case NAMESPACE_SCHEMA_LEVEL:
+        identifier =
+            SnowflakeIdentifier.ofSchema(
+                namespace.level(NAMESPACE_DB_LEVEL - 1),
+                namespace.level(NAMESPACE_SCHEMA_LEVEL - 1));
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Snowflake max namespace level is %d, got namespace '%s'",
+                MAX_NAMESPACE_DEPTH, namespace));
+    }
+    LOG.debug("toSnowflakeIdentifier({}) -> {}", namespace, identifier);
+    return identifier;
+  }
+
+  /**
+   * Converts a TableIdentifier into a SnowflakeIdentifier of type TABLE; the 
identifier must have
+   * exactly the right namespace depth to represent a fully-qualified 
Snowflake table identifier.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(TableIdentifier 
identifier) {
+    SnowflakeIdentifier namespaceScope = 
toSnowflakeIdentifier(identifier.namespace());
+    Preconditions.checkArgument(
+        namespaceScope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "Namespace portion of '%s' must be at the SCHEMA level, got 
namespaceScope '%s'",
+        identifier,
+        namespaceScope);
+    SnowflakeIdentifier ret =
+        SnowflakeIdentifier.ofTable(
+            namespaceScope.databaseName(), namespaceScope.schemaName(), 
identifier.name());
+    LOG.debug("toSnowflakeIdentifier({}) -> {}", identifier, ret);
+    return ret;
+  }
+
+  /**
+   * Converts a SnowflakeIdentifier of type ROOT, DATABASE, or SCHEMA into an 
equivalent Iceberg
+   * Namespace; throws IllegalArgumentException if not an appropriate type.
+   */
+  public static Namespace toIcebergNamespace(SnowflakeIdentifier identifier) {
+    Namespace namespace = null;
+    switch (identifier.type()) {
+      case ROOT:
+        namespace = Namespace.of();

Review Comment:
   nit: `Namespace.empty()`



##########
snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InMemoryFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnowflakeCatalogTest {
+
+  static final String TEST_CATALOG_NAME = "slushLog";

Review Comment:
   nit: can probably be `private`



##########
snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class FakeSnowflakeClient implements SnowflakeClient {

Review Comment:
   this can probably be a follow up, but would it make sense to eventually run 
the snowflake client against a real instance? I'm thinking whether it would be 
possible to use TestContainers with a Snowflake Docker image to do a real 
integration test for this? I think this should be doable and I could help with 
that.



##########
snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class FakeSnowflakeClient implements SnowflakeClient {
+  // In-memory lookup by database/schema/tableName to table metadata.
+  private Map<String, Map<String, Map<String, SnowflakeTableMetadata>>> 
databases =

Review Comment:
   nit: can be final



##########
snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class FakeSnowflakeClient implements SnowflakeClient {
+  // In-memory lookup by database/schema/tableName to table metadata.
+  private Map<String, Map<String, Map<String, SnowflakeTableMetadata>>> 
databases =
+      Maps.newTreeMap();
+  private boolean closed = false;
+
+  public FakeSnowflakeClient() {}
+
+  /**
+   * Also adds parent database/schema if they don't already exist. If the 
tableName already exists
+   * under the given database/schema, the value is replaced with the provided 
metadata.
+   */
+  public void addTable(

Review Comment:
   it seems that it would probably make sense to use `SnowflakeIdentifier` as a 
parameter rather than `database` / `schema` / `tableName`



##########
snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JdbcSnowflakeClientTest {
+  @Mock private Connection mockConnection;
+  @Mock private JdbcClientPool mockClientPool;
+  @Mock private JdbcSnowflakeClient.QueryHarness mockQueryHarness;
+  @Mock private ResultSet mockResultSet;
+
+  private JdbcSnowflakeClient snowflakeClient;
+
+  @Before
+  public void before() throws SQLException, InterruptedException {
+    snowflakeClient = new JdbcSnowflakeClient(mockClientPool);
+    snowflakeClient.setQueryHarness(mockQueryHarness);
+
+    doAnswer(
+            new Answer() {
+              @Override
+              public Object answer(InvocationOnMock invocation) throws 
Throwable {
+                return ((ClientPool.Action) 
invocation.getArguments()[0]).run(mockConnection);
+              }
+            })
+        .when(mockClientPool)
+        .run(any(ClientPool.Action.class));
+    doAnswer(
+            new Answer() {
+              @Override
+              public Object answer(InvocationOnMock invocation) throws 
Throwable {
+                return ((JdbcSnowflakeClient.ResultSetParser) 
invocation.getArguments()[2])
+                    .parse(mockResultSet);
+              }
+            })
+        .when(mockQueryHarness)
+        .query(
+            any(Connection.class),
+            any(String.class),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            ArgumentMatchers.<String>any());
+  }
+
+  @Test
+  public void testNullClientPoolInConstructor() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> new JdbcSnowflakeClient(null))
+        .withMessageContaining("JdbcClientPool must be non-null");
+  }
+
+  @Test
+  public void testDatabaseExists() throws SQLException {
+    when(mockResultSet.next()).thenReturn(true).thenReturn(false);
+    when(mockResultSet.getString("name")).thenReturn("DB_1");
+
+    
Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .isTrue();
+
+    verify(mockQueryHarness)
+        .query(
+            eq(mockConnection),
+            eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"),
+            any(JdbcSnowflakeClient.ResultSetParser.class),
+            eq((String[]) null));

Review Comment:
   nit: unnecessary cast



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java:
##########
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * This implementation of SnowflakeClient builds on top of Snowflake's JDBC 
driver to interact with
+ * Snowflake's Iceberg-aware resource model.
+ */
+class JdbcSnowflakeClient implements SnowflakeClient {
+  public static final String EXPECTED_JDBC_IMPL = 
"net.snowflake.client.jdbc.SnowflakeDriver";

Review Comment:
   nit: can probably be package-private



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java:
##########
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * This implementation of SnowflakeClient builds on top of Snowflake's JDBC 
driver to interact with
+ * Snowflake's Iceberg-aware resource model.
+ */
+class JdbcSnowflakeClient implements SnowflakeClient {
+  public static final String EXPECTED_JDBC_IMPL = 
"net.snowflake.client.jdbc.SnowflakeDriver";
+
+  @FunctionalInterface
+  interface ResultSetParser<T> {
+    T parse(ResultSet rs) throws SQLException;
+  }
+
+  /**
+   * This class wraps the basic boilerplate of setting up PreparedStatements 
and applying a
+   * ResultSetParser to translate a ResultSet into parsed objects. Allows 
easily injecting
+   * subclasses for debugging/testing purposes.
+   */
+  static class QueryHarness {
+    public <T> T query(Connection conn, String sql, ResultSetParser<T> parser, 
String... args)
+        throws SQLException {
+      try (PreparedStatement statement = conn.prepareStatement(sql)) {
+        if (args != null) {
+          for (int i = 0; i < args.length; ++i) {
+            statement.setString(i + 1, args[i]);
+          }
+        }
+
+        try (ResultSet rs = statement.executeQuery()) {
+          return parser.parse(rs);
+        }
+      }
+    }
+  }
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake 
Database identifiers,
+   * containing "name" (representing databaseName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
DATABASE_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> databases = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("name");
+          databases.add(SnowflakeIdentifier.ofDatabase(databaseName));
+        }
+        return databases;
+      };
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake 
Schema identifiers,
+   * containing "database_name" and "name" (representing schemaName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
SCHEMA_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> schemas = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("database_name");
+          String schemaName = rs.getString("name");
+          schemas.add(SnowflakeIdentifier.ofSchema(databaseName, schemaName));
+        }
+        return schemas;
+      };
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake Table 
identifiers,
+   * containing "database_name", "schema_name", and "name" (representing 
tableName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
TABLE_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> tables = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("database_name");
+          String schemaName = rs.getString("schema_name");
+          String tableName = rs.getString("name");
+          tables.add(SnowflakeIdentifier.ofTable(databaseName, schemaName, 
tableName));
+        }
+        return tables;
+      };
+
+  /**
+   * Expects to handle ResultSets representing a single record holding 
Snowflake Iceberg metadata.
+   */
+  public static final ResultSetParser<SnowflakeTableMetadata> 
TABLE_METADATA_RESULT_SET_HANDLER =
+      rs -> {
+        if (!rs.next()) {
+          return null;
+        }
+
+        String rawJsonVal = rs.getString("METADATA");
+        return SnowflakeTableMetadata.parseJson(rawJsonVal);
+      };
+
+  private final JdbcClientPool connectionPool;
+  private QueryHarness queryHarness;
+
+  JdbcSnowflakeClient(JdbcClientPool conn) {
+    Preconditions.checkArgument(null != conn, "JdbcClientPool must be 
non-null");
+    connectionPool = conn;
+    queryHarness = new QueryHarness();
+  }
+
+  @VisibleForTesting
+  void setQueryHarness(QueryHarness queryHarness) {
+    this.queryHarness = queryHarness;
+  }
+
+  /**
+   * For rare cases where PreparedStatements aren't supported for 
user-supplied identifiers intended
+   * for use in special LIKE clauses, we can sanitize by "broadening" the 
identifier with
+   * single-character wildcards and manually post-filter client-side.
+   *
+   * <p>Note: This sanitization approach intentionally "broadens" the scope of 
matching results;
+   * callers must be able to handle this method returning an all-wildcard 
expression; i.e. the
+   * caller must treat the usage of the LIKE clause as only an optional 
optimization, and should
+   * post-filter for correctness as if the LIKE clause wasn't present in the 
query at all.
+   */
+  @VisibleForTesting
+  String sanitizeIdentifierWithWildcardForLikeClause(String identifier) {
+    // Restrict identifiers to the "Unquoted object identifiers" synax 
documented at
+    // https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html
+    //
+    // Use a strict allowlist of characters, replace everything *not* matching 
the character set
+    // with "_", which is used as a single-character wildcard in Snowflake.
+    String sanitized = identifier.replaceAll("[^a-zA-Z0-9_$]", "_");
+    if (sanitized.startsWith("$")) {
+      sanitized = "_" + sanitized.substring(1);
+    }
+    return sanitized;
+  }
+
+  @Override
+  public boolean databaseExists(SnowflakeIdentifier database) {
+    Preconditions.checkArgument(
+        database.type() == SnowflakeIdentifier.Type.DATABASE,
+        "databaseExists requires a DATABASE identifier, got '%s'",
+        database);
+
+    // Due to current limitations in PreparedStatement parameters for the LIKE 
clause in
+    // SHOW DATABASES queries, we'll use a fairly limited allowlist for 
identifier characters,
+    // using wildcards for non-allowed characters, and post-filter for 
matching.
+    final String finalQuery =
+        String.format(
+            "SHOW DATABASES LIKE '%s' IN ACCOUNT",
+            
sanitizeIdentifierWithWildcardForLikeClause(database.databaseName()));
+    List<SnowflakeIdentifier> databases;
+    try {
+      databases =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(
+                      conn, finalQuery, DATABASE_RESULT_SET_HANDLER, 
(String[]) null));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to check if database exists");

Review Comment:
   should this maybe mention the `database` name being checked? Same for the 
exception messages in the other methods in this class



##########
core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.Map;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class InMemoryFileIO implements FileIO {

Review Comment:
   I would probably consider introducing this as a separate PR. That way it can 
be reviewed independently from the Snowflake Catalog PR



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class NamespaceHelpers {
+  private static final int MAX_NAMESPACE_DEPTH = 2;
+  private static final int NAMESPACE_ROOT_LEVEL = 0;
+  private static final int NAMESPACE_DB_LEVEL = 1;
+  private static final int NAMESPACE_SCHEMA_LEVEL = 2;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NamespaceHelpers.class);
+
+  private NamespaceHelpers() {}
+
+  /**
+   * Converts a Namespace into a SnowflakeIdentifier representing ROOT, a 
DATABASE, or a SCHEMA.
+   *
+   * @throws IllegalArgumentException if the namespace is not a supported 
depth.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(Namespace namespace) 
{
+    SnowflakeIdentifier identifier = null;
+    switch (namespace.length()) {
+      case NAMESPACE_ROOT_LEVEL:
+        identifier = SnowflakeIdentifier.ofRoot();
+        break;
+      case NAMESPACE_DB_LEVEL:
+        identifier = 
SnowflakeIdentifier.ofDatabase(namespace.level(NAMESPACE_DB_LEVEL - 1));
+        break;
+      case NAMESPACE_SCHEMA_LEVEL:
+        identifier =
+            SnowflakeIdentifier.ofSchema(
+                namespace.level(NAMESPACE_DB_LEVEL - 1),
+                namespace.level(NAMESPACE_SCHEMA_LEVEL - 1));
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Snowflake max namespace level is %d, got namespace '%s'",
+                MAX_NAMESPACE_DEPTH, namespace));
+    }
+    LOG.debug("toSnowflakeIdentifier({}) -> {}", namespace, identifier);
+    return identifier;
+  }
+
+  /**
+   * Converts a TableIdentifier into a SnowflakeIdentifier of type TABLE; the 
identifier must have
+   * exactly the right namespace depth to represent a fully-qualified 
Snowflake table identifier.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(TableIdentifier 
identifier) {
+    SnowflakeIdentifier namespaceScope = 
toSnowflakeIdentifier(identifier.namespace());
+    Preconditions.checkArgument(
+        namespaceScope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "Namespace portion of '%s' must be at the SCHEMA level, got 
namespaceScope '%s'",
+        identifier,
+        namespaceScope);
+    SnowflakeIdentifier ret =
+        SnowflakeIdentifier.ofTable(
+            namespaceScope.databaseName(), namespaceScope.schemaName(), 
identifier.name());
+    LOG.debug("toSnowflakeIdentifier({}) -> {}", identifier, ret);
+    return ret;
+  }
+
+  /**
+   * Converts a SnowflakeIdentifier of type ROOT, DATABASE, or SCHEMA into an 
equivalent Iceberg
+   * Namespace; throws IllegalArgumentException if not an appropriate type.
+   */
+  public static Namespace toIcebergNamespace(SnowflakeIdentifier identifier) {
+    Namespace namespace = null;
+    switch (identifier.type()) {
+      case ROOT:
+        namespace = Namespace.of();
+        break;
+      case DATABASE:
+        namespace = Namespace.of(identifier.databaseName());
+        break;
+      case SCHEMA:
+        namespace = Namespace.of(identifier.databaseName(), 
identifier.schemaName());
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Cannot convert identifier '%s' to Namespace", 
identifier));
+    }
+    LOG.debug("toIcebergNamespace({}) -> {}", identifier, namespace);

Review Comment:
   nit: I feel like this is just adding unnecessary overhead to the logic of 
the method. For example, rather than storing the result in a `namespace` 
variable, it would be simpler to just do a `return Namespace.of(..)` in each 
case.
   
   This applies to all methods in this class, so I'm not sure if we're gaining 
anything from the debug logging here? 



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnowflakeCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+  public static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
+  public static final String DEFAULT_FILE_IO_IMPL = 
"org.apache.iceberg.io.ResolvingFileIO";
+
+  static class FileIOFactory {
+    public FileIO newFileIO(String impl, Map<String, String> properties, 
Object hadoopConf) {
+      return CatalogUtil.loadFileIO(impl, properties, hadoopConf);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnowflakeCatalog.class);
+
+  private CloseableGroup closeableGroup;
+  private Object conf;
+  private String catalogName;
+  private Map<String, String> catalogProperties;
+  private FileIOFactory fileIOFactory;
+  private SnowflakeClient snowflakeClient;
+
+  public SnowflakeCatalog() {}
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    SnowflakeIdentifier scope = 
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    Preconditions.checkArgument(
+        scope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "listTables must be at SCHEMA level; got %s from namespace %s",
+        scope,
+        namespace);
+
+    List<SnowflakeIdentifier> sfTables = 
snowflakeClient.listIcebergTables(scope);
+
+    return sfTables.stream()
+        .map(NamespaceHelpers::toIcebergTableIdentifier)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support dropTable");
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support renameTable");
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    String uri = properties.get(CatalogProperties.URI);
+    Preconditions.checkNotNull(uri, "JDBC connection URI is required");
+    try {
+      // We'll ensure the expected JDBC driver implementation class is 
initialized through
+      // reflection regardless of which classloader ends up using this 
JdbcSnowflakeClient, but
+      // we'll only warn if the expected driver fails to load, since users may 
use repackaged or
+      // custom JDBC drivers for Snowflake communication.
+      Class.forName(JdbcSnowflakeClient.EXPECTED_JDBC_IMPL);
+    } catch (ClassNotFoundException cnfe) {
+      LOG.warn(
+          "Failed to load expected JDBC SnowflakeDriver - if queries fail by 
failing"
+              + " to find a suitable driver for jdbc:snowflake:// URIs, you 
must add the Snowflake "
+              + " JDBC driver to your jars/packages",
+          cnfe);
+    }
+    JdbcClientPool connectionPool = new JdbcClientPool(uri, properties);
+
+    initialize(name, new JdbcSnowflakeClient(connectionPool), new 
FileIOFactory(), properties);
+  }
+
+  /**
+   * Initialize using caller-supplied SnowflakeClient and FileIO.
+   *
+   * @param name The name of the catalog, defaults to "snowflake_catalog"
+   * @param snowflakeClient The client encapsulating network communication 
with Snowflake
+   * @param fileIOFactory The {@link FileIOFactory} to use to instantiate a 
new FileIO for each new
+   *     table operation
+   * @param properties The catalog options to use and propagate to dependencies
+   */
+  @SuppressWarnings("checkstyle:HiddenField")
+  void initialize(
+      String name,
+      SnowflakeClient snowflakeClient,
+      FileIOFactory fileIOFactory,
+      Map<String, String> properties) {
+    Preconditions.checkArgument(null != snowflakeClient, "snowflakeClient must 
be non-null");
+    Preconditions.checkArgument(null != fileIOFactory, "fileIOFactory must be 
non-null");
+    this.catalogName = name == null ? DEFAULT_CATALOG_NAME : name;
+    this.snowflakeClient = snowflakeClient;
+    this.fileIOFactory = fileIOFactory;
+    this.catalogProperties = properties;
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(snowflakeClient);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (null != closeableGroup) {
+      closeableGroup.close();
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    throw new UnsupportedOperationException(
+        "SnowflakeCatalog does not currently support createNamespace");
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) {
+    SnowflakeIdentifier scope = 
NamespaceHelpers.toSnowflakeIdentifier(namespace);
+    List<SnowflakeIdentifier> results = null;
+    switch (scope.type()) {
+      case ROOT:
+        results = snowflakeClient.listDatabases();
+        break;
+      case DATABASE:
+        results = snowflakeClient.listSchemas(scope);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "listNamespaces must be at either ROOT or DATABASE level; got 
%s from namespace %s",
+                scope, namespace));
+    }
+
+    List<Namespace> namespaceList =
+        results.stream()
+            .map(
+                result -> {
+                  Preconditions.checkState(

Review Comment:
   to me it feels like this check should be somewhere else - maybe in the 
`snowflakeClient.listDatabases()` /  `snowflakeClient.listSchemas(scope)` that 
then guarantess that the correct type comes back?



##########
snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JdbcSnowflakeClientTest {
+  @Mock private Connection mockConnection;
+  @Mock private JdbcClientPool mockClientPool;
+  @Mock private JdbcSnowflakeClient.QueryHarness mockQueryHarness;
+  @Mock private ResultSet mockResultSet;
+
+  private JdbcSnowflakeClient snowflakeClient;
+
+  @Before
+  public void before() throws SQLException, InterruptedException {
+    snowflakeClient = new JdbcSnowflakeClient(mockClientPool);
+    snowflakeClient.setQueryHarness(mockQueryHarness);
+
+    doAnswer(
+            new Answer() {

Review Comment:
   nit: this and other places can be replaced with a Lambda



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class NamespaceHelpers {
+  private static final int MAX_NAMESPACE_DEPTH = 2;
+  private static final int NAMESPACE_ROOT_LEVEL = 0;
+  private static final int NAMESPACE_DB_LEVEL = 1;
+  private static final int NAMESPACE_SCHEMA_LEVEL = 2;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NamespaceHelpers.class);
+
+  private NamespaceHelpers() {}
+
+  /**
+   * Converts a Namespace into a SnowflakeIdentifier representing ROOT, a 
DATABASE, or a SCHEMA.
+   *
+   * @throws IllegalArgumentException if the namespace is not a supported 
depth.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(Namespace namespace) 
{
+    SnowflakeIdentifier identifier = null;
+    switch (namespace.length()) {
+      case NAMESPACE_ROOT_LEVEL:
+        identifier = SnowflakeIdentifier.ofRoot();
+        break;
+      case NAMESPACE_DB_LEVEL:
+        identifier = 
SnowflakeIdentifier.ofDatabase(namespace.level(NAMESPACE_DB_LEVEL - 1));
+        break;
+      case NAMESPACE_SCHEMA_LEVEL:
+        identifier =
+            SnowflakeIdentifier.ofSchema(
+                namespace.level(NAMESPACE_DB_LEVEL - 1),
+                namespace.level(NAMESPACE_SCHEMA_LEVEL - 1));
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Snowflake max namespace level is %d, got namespace '%s'",
+                MAX_NAMESPACE_DEPTH, namespace));
+    }
+    LOG.debug("toSnowflakeIdentifier({}) -> {}", namespace, identifier);
+    return identifier;
+  }
+
+  /**
+   * Converts a TableIdentifier into a SnowflakeIdentifier of type TABLE; the 
identifier must have
+   * exactly the right namespace depth to represent a fully-qualified 
Snowflake table identifier.
+   */
+  public static SnowflakeIdentifier toSnowflakeIdentifier(TableIdentifier 
identifier) {
+    SnowflakeIdentifier namespaceScope = 
toSnowflakeIdentifier(identifier.namespace());
+    Preconditions.checkArgument(
+        namespaceScope.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "Namespace portion of '%s' must be at the SCHEMA level, got 
namespaceScope '%s'",
+        identifier,
+        namespaceScope);
+    SnowflakeIdentifier ret =
+        SnowflakeIdentifier.ofTable(
+            namespaceScope.databaseName(), namespaceScope.schemaName(), 
identifier.name());
+    LOG.debug("toSnowflakeIdentifier({}) -> {}", identifier, ret);
+    return ret;
+  }
+
+  /**
+   * Converts a SnowflakeIdentifier of type ROOT, DATABASE, or SCHEMA into an 
equivalent Iceberg
+   * Namespace; throws IllegalArgumentException if not an appropriate type.
+   */
+  public static Namespace toIcebergNamespace(SnowflakeIdentifier identifier) {
+    Namespace namespace = null;
+    switch (identifier.type()) {
+      case ROOT:
+        namespace = Namespace.of();
+        break;
+      case DATABASE:
+        namespace = Namespace.of(identifier.databaseName());
+        break;
+      case SCHEMA:
+        namespace = Namespace.of(identifier.databaseName(), 
identifier.schemaName());
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Cannot convert identifier '%s' to Namespace", 
identifier));
+    }
+    LOG.debug("toIcebergNamespace({}) -> {}", identifier, namespace);
+    return namespace;
+  }
+
+  /**
+   * Converts a SnowflakeIdentifier to an equivalent Iceberg TableIdentifier; 
the identifier must be
+   * of type TABLE.
+   */
+  public static TableIdentifier toIcebergTableIdentifier(SnowflakeIdentifier 
identifier) {
+    Preconditions.checkArgument(
+        identifier.type() == SnowflakeIdentifier.Type.TABLE,
+        "SnowflakeIdentifier must be type TABLE, get '%s'",

Review Comment:
   nit: get -> got



##########
snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java:
##########
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * This implementation of SnowflakeClient builds on top of Snowflake's JDBC 
driver to interact with
+ * Snowflake's Iceberg-aware resource model.
+ */
+class JdbcSnowflakeClient implements SnowflakeClient {
+  public static final String EXPECTED_JDBC_IMPL = 
"net.snowflake.client.jdbc.SnowflakeDriver";
+
+  @FunctionalInterface
+  interface ResultSetParser<T> {
+    T parse(ResultSet rs) throws SQLException;
+  }
+
+  /**
+   * This class wraps the basic boilerplate of setting up PreparedStatements 
and applying a
+   * ResultSetParser to translate a ResultSet into parsed objects. Allows 
easily injecting
+   * subclasses for debugging/testing purposes.
+   */
+  static class QueryHarness {
+    public <T> T query(Connection conn, String sql, ResultSetParser<T> parser, 
String... args)
+        throws SQLException {
+      try (PreparedStatement statement = conn.prepareStatement(sql)) {
+        if (args != null) {
+          for (int i = 0; i < args.length; ++i) {
+            statement.setString(i + 1, args[i]);
+          }
+        }
+
+        try (ResultSet rs = statement.executeQuery()) {
+          return parser.parse(rs);
+        }
+      }
+    }
+  }
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake 
Database identifiers,
+   * containing "name" (representing databaseName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
DATABASE_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> databases = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("name");
+          databases.add(SnowflakeIdentifier.ofDatabase(databaseName));
+        }
+        return databases;
+      };
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake 
Schema identifiers,
+   * containing "database_name" and "name" (representing schemaName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
SCHEMA_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> schemas = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("database_name");
+          String schemaName = rs.getString("name");
+          schemas.add(SnowflakeIdentifier.ofSchema(databaseName, schemaName));
+        }
+        return schemas;
+      };
+
+  /**
+   * Expects to handle ResultSets representing fully-qualified Snowflake Table 
identifiers,
+   * containing "database_name", "schema_name", and "name" (representing 
tableName).
+   */
+  public static final ResultSetParser<List<SnowflakeIdentifier>> 
TABLE_RESULT_SET_HANDLER =
+      rs -> {
+        List<SnowflakeIdentifier> tables = Lists.newArrayList();
+        while (rs.next()) {
+          String databaseName = rs.getString("database_name");
+          String schemaName = rs.getString("schema_name");
+          String tableName = rs.getString("name");
+          tables.add(SnowflakeIdentifier.ofTable(databaseName, schemaName, 
tableName));
+        }
+        return tables;
+      };
+
+  /**
+   * Expects to handle ResultSets representing a single record holding 
Snowflake Iceberg metadata.
+   */
+  public static final ResultSetParser<SnowflakeTableMetadata> 
TABLE_METADATA_RESULT_SET_HANDLER =
+      rs -> {
+        if (!rs.next()) {
+          return null;
+        }
+
+        String rawJsonVal = rs.getString("METADATA");
+        return SnowflakeTableMetadata.parseJson(rawJsonVal);
+      };
+
+  private final JdbcClientPool connectionPool;
+  private QueryHarness queryHarness;
+
+  JdbcSnowflakeClient(JdbcClientPool conn) {
+    Preconditions.checkArgument(null != conn, "JdbcClientPool must be 
non-null");
+    connectionPool = conn;
+    queryHarness = new QueryHarness();
+  }
+
+  @VisibleForTesting
+  void setQueryHarness(QueryHarness queryHarness) {
+    this.queryHarness = queryHarness;
+  }
+
+  /**
+   * For rare cases where PreparedStatements aren't supported for 
user-supplied identifiers intended
+   * for use in special LIKE clauses, we can sanitize by "broadening" the 
identifier with
+   * single-character wildcards and manually post-filter client-side.
+   *
+   * <p>Note: This sanitization approach intentionally "broadens" the scope of 
matching results;
+   * callers must be able to handle this method returning an all-wildcard 
expression; i.e. the
+   * caller must treat the usage of the LIKE clause as only an optional 
optimization, and should
+   * post-filter for correctness as if the LIKE clause wasn't present in the 
query at all.
+   */
+  @VisibleForTesting
+  String sanitizeIdentifierWithWildcardForLikeClause(String identifier) {
+    // Restrict identifiers to the "Unquoted object identifiers" synax 
documented at
+    // https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html
+    //
+    // Use a strict allowlist of characters, replace everything *not* matching 
the character set
+    // with "_", which is used as a single-character wildcard in Snowflake.
+    String sanitized = identifier.replaceAll("[^a-zA-Z0-9_$]", "_");
+    if (sanitized.startsWith("$")) {
+      sanitized = "_" + sanitized.substring(1);
+    }
+    return sanitized;
+  }
+
+  @Override
+  public boolean databaseExists(SnowflakeIdentifier database) {
+    Preconditions.checkArgument(
+        database.type() == SnowflakeIdentifier.Type.DATABASE,
+        "databaseExists requires a DATABASE identifier, got '%s'",
+        database);
+
+    // Due to current limitations in PreparedStatement parameters for the LIKE 
clause in
+    // SHOW DATABASES queries, we'll use a fairly limited allowlist for 
identifier characters,
+    // using wildcards for non-allowed characters, and post-filter for 
matching.
+    final String finalQuery =
+        String.format(
+            "SHOW DATABASES LIKE '%s' IN ACCOUNT",
+            
sanitizeIdentifierWithWildcardForLikeClause(database.databaseName()));
+    List<SnowflakeIdentifier> databases;
+    try {
+      databases =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(
+                      conn, finalQuery, DATABASE_RESULT_SET_HANDLER, 
(String[]) null));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to check if database exists");
+    } catch (InterruptedException e) {
+      throw new UncheckedInterruptedException(e, "Interrupted while checking 
if database exists");
+    }
+
+    // Filter to handle the edge case of '_' appearing as a wildcard that 
can't be remapped the way
+    // it can for predicates in SELECT statements.
+    databases.removeIf(db -> 
!database.databaseName().equalsIgnoreCase(db.databaseName()));
+    return !databases.isEmpty();
+  }
+
+  @Override
+  public boolean schemaExists(SnowflakeIdentifier schema) {
+    Preconditions.checkArgument(
+        schema.type() == SnowflakeIdentifier.Type.SCHEMA,
+        "schemaExists requires a SCHEMA identifier, got '%s'",
+        schema);
+
+    if 
(!databaseExists(SnowflakeIdentifier.ofDatabase(schema.databaseName()))) {
+      return false;
+    }
+
+    // Due to current limitations in PreparedStatement parameters for the LIKE 
clause in
+    // SHOW SCHEMAS queries, we'll use a fairly limited allowlist for 
identifier characters,
+    // using wildcards for non-allowed characters, and post-filter for 
matching.
+    final String finalQuery =
+        String.format(
+            "SHOW SCHEMAS LIKE '%s' IN DATABASE IDENTIFIER(?)",
+            sanitizeIdentifierWithWildcardForLikeClause(schema.schemaName()));
+    List<SnowflakeIdentifier> schemas;
+    try {
+      schemas =
+          connectionPool.run(
+              conn ->
+                  queryHarness.query(
+                      conn,
+                      finalQuery,
+                      SCHEMA_RESULT_SET_HANDLER,
+                      new String[] {schema.databaseName()}));
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to check if schema exists");

Review Comment:
   should this mention the `schema` name being checked?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to