nastra commented on code in PR #9487: URL: https://github.com/apache/iceberg/pull/9487#discussion_r1478448616
########## core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.sql.DataTruncation; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.SQLTimeoutException; +import java.sql.SQLTransientConnectionException; +import java.sql.SQLWarning; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** JDBC implementation of Iceberg ViewOperations. */ +public class JdbcViewOperations extends BaseViewOperations { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcViewOperations.class); + private final String catalogName; + private final TableIdentifier viewIdentifier; + private final FileIO fileIO; + private final JdbcClientPool connections; + private final Map<String, String> catalogProperties; + + protected JdbcViewOperations( + JdbcClientPool dbConnPool, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier, + Map<String, String> catalogProperties) { + this.catalogName = catalogName; + this.viewIdentifier = viewIdentifier; + this.fileIO = fileIO; + this.connections = dbConnPool; + this.catalogProperties = catalogProperties; + } + + @Override + protected void doRefresh() { + Map<String, String> view; + + try { + view = JdbcUtil.loadView(connections, catalogName, viewIdentifier); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during refresh"); + } catch (SQLException e) { + // SQL exception happened when getting view from catalog + throw new UncheckedSQLException( + e, "Failed to get view %s from catalog %s", viewIdentifier, catalogName); + } + + if (view.isEmpty()) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } else { + this.disableRefresh(); + return; + } + } + + String newMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + Preconditions.checkState( + newMetadataLocation != null, "Invalid view %s: metadata location is null", viewIdentifier); + refreshFromMetadataLocation(newMetadataLocation); + } + + @Override + protected void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + try { + Map<String, String> view = JdbcUtil.loadView(connections, catalogName, viewIdentifier); + if (base != null) { + validateMetadataLocation(view, base); + String oldMetadataLocation = base.metadataFileLocation(); + // Start atomic update + LOG.debug("Committing existing view: {}", viewName()); + updateView(newMetadataLocation, oldMetadataLocation); + } else { + // view does not exist, create it + LOG.debug("Committing new view: {}", viewName()); + createView(newMetadataLocation); + } + + } catch (SQLIntegrityConstraintViolationException e) { + if (currentMetadataLocation() == null) { + throw new AlreadyExistsException(e, "View already exists: %s", viewIdentifier); + } else { + throw new UncheckedSQLException(e, "View already exists: %s", viewIdentifier); + } + + } catch (SQLTimeoutException e) { + throw new UncheckedSQLException(e, "Database Connection timeout"); + } catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) { + throw new UncheckedSQLException(e, "Database Connection failed"); + } catch (DataTruncation e) { + throw new UncheckedSQLException(e, "Database data truncation error"); + } catch (SQLWarning e) { + throw new UncheckedSQLException(e, "Database warning"); + } catch (SQLException e) { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (e.getMessage().contains("constraint failed")) { + throw new AlreadyExistsException("View already exists: %s", viewIdentifier); + } + + throw new UncheckedSQLException(e, "Unknown failure"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during commit"); + } + } + + @Override + protected String viewName() { + return viewIdentifier.toString(); + } + + @Override + protected FileIO io() { + return fileIO; + } + + private void validateMetadataLocation(Map<String, String> view, ViewMetadata base) { + String catalogMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + + if (!Objects.equals(baseMetadataLocation, catalogMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s: metadata location %s has changed from %s", + viewIdentifier, baseMetadataLocation, catalogMetadataLocation); + } + } + + private void updateView(String newMetadataLocation, String oldMetadataLocation) + throws SQLException, InterruptedException { + int updatedRecords = + JdbcUtil.updateView( + connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + + if (updatedRecords == 1) { + LOG.debug("Successfully committed to existing view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to update view %s from catalog %s", viewIdentifier, catalogName); + } + } + + private void createView(String newMetadataLocation) throws SQLException, InterruptedException { + Namespace namespace = viewIdentifier.namespace(); + if (PropertyUtil.propertyAsBoolean(catalogProperties, JdbcUtil.STRICT_MODE_PROPERTY, false) + && !JdbcUtil.namespaceExists(catalogName, connections, namespace)) { + throw new NoSuchNamespaceException( + "Cannot create view %s in catalog %s. Namespace %s does not exist", + viewIdentifier, catalogName, namespace); + } + + if (JdbcUtil.viewExists(catalogName, connections, viewIdentifier)) { + throw new AlreadyExistsException("View already exists: %s", viewIdentifier); + } + if (JdbcUtil.tableExists(catalogName, connections, viewIdentifier)) { Review Comment: nit: in other catalog impls we typically check whether the table exists before checking whether the view exists ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java: ########## @@ -182,18 +168,16 @@ private void createTable(String newMetadataLocation) throws SQLException, Interr tableIdentifier, catalogName, namespace); } + if (JdbcUtil.tableExists(catalogName, connections, tableIdentifier)) { + throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); + } Review Comment: nit: newline after this ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.sql.DataTruncation; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.SQLTimeoutException; +import java.sql.SQLTransientConnectionException; +import java.sql.SQLWarning; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** JDBC implementation of Iceberg ViewOperations. */ +public class JdbcViewOperations extends BaseViewOperations { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcViewOperations.class); + private final String catalogName; + private final TableIdentifier viewIdentifier; + private final FileIO fileIO; + private final JdbcClientPool connections; + private final Map<String, String> catalogProperties; + + protected JdbcViewOperations( + JdbcClientPool dbConnPool, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier, + Map<String, String> catalogProperties) { + this.catalogName = catalogName; + this.viewIdentifier = viewIdentifier; + this.fileIO = fileIO; + this.connections = dbConnPool; + this.catalogProperties = catalogProperties; + } + + @Override + protected void doRefresh() { + Map<String, String> view; + + try { + view = JdbcUtil.loadView(connections, catalogName, viewIdentifier); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during refresh"); + } catch (SQLException e) { + // SQL exception happened when getting view from catalog + throw new UncheckedSQLException( + e, "Failed to get view %s from catalog %s", viewIdentifier, catalogName); + } + + if (view.isEmpty()) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } else { + this.disableRefresh(); + return; + } + } + + String newMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + Preconditions.checkState( + newMetadataLocation != null, "Invalid view %s: metadata location is null", viewIdentifier); + refreshFromMetadataLocation(newMetadataLocation); + } + + @Override + protected void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + try { + Map<String, String> view = JdbcUtil.loadView(connections, catalogName, viewIdentifier); + if (base != null) { + validateMetadataLocation(view, base); + String oldMetadataLocation = base.metadataFileLocation(); + // Start atomic update + LOG.debug("Committing existing view: {}", viewName()); + updateView(newMetadataLocation, oldMetadataLocation); + } else { + // view does not exist, create it + LOG.debug("Committing new view: {}", viewName()); + createView(newMetadataLocation); + } + + } catch (SQLIntegrityConstraintViolationException e) { + if (currentMetadataLocation() == null) { + throw new AlreadyExistsException(e, "View already exists: %s", viewIdentifier); + } else { + throw new UncheckedSQLException(e, "View already exists: %s", viewIdentifier); + } + + } catch (SQLTimeoutException e) { + throw new UncheckedSQLException(e, "Database Connection timeout"); + } catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) { + throw new UncheckedSQLException(e, "Database Connection failed"); + } catch (DataTruncation e) { + throw new UncheckedSQLException(e, "Database data truncation error"); + } catch (SQLWarning e) { + throw new UncheckedSQLException(e, "Database warning"); + } catch (SQLException e) { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (e.getMessage().contains("constraint failed")) { + throw new AlreadyExistsException("View already exists: %s", viewIdentifier); + } + + throw new UncheckedSQLException(e, "Unknown failure"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted during commit"); + } + } + + @Override + protected String viewName() { + return viewIdentifier.toString(); + } + + @Override + protected FileIO io() { + return fileIO; + } + + private void validateMetadataLocation(Map<String, String> view, ViewMetadata base) { + String catalogMetadataLocation = view.get(JdbcTableOperations.METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + + if (!Objects.equals(baseMetadataLocation, catalogMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s: metadata location %s has changed from %s", + viewIdentifier, baseMetadataLocation, catalogMetadataLocation); + } + } + + private void updateView(String newMetadataLocation, String oldMetadataLocation) + throws SQLException, InterruptedException { + int updatedRecords = + JdbcUtil.updateView( + connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + + if (updatedRecords == 1) { + LOG.debug("Successfully committed to existing view: {}", viewIdentifier); + } else { + throw new CommitFailedException( + "Failed to update view %s from catalog %s", viewIdentifier, catalogName); + } + } + + private void createView(String newMetadataLocation) throws SQLException, InterruptedException { + Namespace namespace = viewIdentifier.namespace(); + if (PropertyUtil.propertyAsBoolean(catalogProperties, JdbcUtil.STRICT_MODE_PROPERTY, false) + && !JdbcUtil.namespaceExists(catalogName, connections, namespace)) { + throw new NoSuchNamespaceException( + "Cannot create view %s in catalog %s. Namespace %s does not exist", + viewIdentifier, catalogName, namespace); + } + + if (JdbcUtil.viewExists(catalogName, connections, viewIdentifier)) { + throw new AlreadyExistsException("View already exists: %s", viewIdentifier); + } Review Comment: nit: newline ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java: ########## @@ -185,16 +212,34 @@ private void initializeCatalogTables() throws InterruptedException, SQLException LOG.debug( "Creating table {} to store iceberg catalog namespace properties", JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME); - return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute(); + return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL).execute(); }); } + private void updateCatalogTables(Connection connection) throws SQLException { + LOG.trace("Updating database tables (if needed)"); + DatabaseMetaData dbMeta = connection.getMetaData(); + ResultSet tableColumns = + dbMeta.getColumns(null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, JdbcUtil.TYPE); + if (tableColumns.next()) { + LOG.debug("{} is already up to date", JdbcUtil.CATALOG_TABLE_VIEW_NAME); Review Comment: ```suggestion LOG.debug("{} is already up-to-date", JdbcUtil.CATALOG_TABLE_VIEW_NAME); ``` ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ########## @@ -25,31 +25,36 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; final class JdbcUtil { // property to control strict-mode (aka check if namespace exists when creating a table) static final String STRICT_MODE_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "strict-mode"; - // Catalog Table - static final String CATALOG_TABLE_NAME = "iceberg_tables"; + // Catalog Table & View + static final String CATALOG_TABLE_VIEW_NAME = "iceberg_tables"; static final String CATALOG_NAME = "catalog_name"; - static final String TABLE_NAMESPACE = "table_namespace"; static final String TABLE_NAME = "table_name"; + static final String TABLE_NAMESPACE = "table_namespace"; + static final String TYPE = "type"; Review Comment: `type` is a reserved keyword in [MySql](https://dev.mysql.com/doc/refman/8.0/en/keywords.html#keywords-8-0-detailed-T) so we'd have to use something else (what about `iceberg_type`?) ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java: ########## @@ -134,14 +155,15 @@ public void initialize(String name, Map<String, String> properties) { initializeCatalogTables(); } } catch (SQLTimeoutException e) { - throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Query timed out"); + throw new UncheckedSQLException(e, "Cannot initialize/update JDBC catalog: Query timed out"); Review Comment: nit: are these changes needed? it seems just having `initialize` in the error msg is fine ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ########## @@ -81,85 +90,103 @@ final class JdbcUtil { + TABLE_NAME + ")" + ")"; - static final String GET_TABLE_SQL = + static final String UPDATE_CATALOG_SQL = + "ALTER TABLE " + CATALOG_TABLE_VIEW_NAME + " ADD COLUMN " + TYPE + " VARCHAR(5)"; + + private static final String GET_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME - + " = ? "; - static final String LIST_TABLES_SQL = + + " = ? AND " + + TYPE + + " = ?"; + private static final String GET_TABLE_SQL = + GET_SQL + + " OR " + + TYPE + + " IS NULL"; // type is null when the SQL database has been updated from previous version + static final String LIST_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + + " = ? AND " + + TYPE + " = ?"; - static final String RENAME_TABLE_SQL = + static final String LIST_TABLE_SQL = LIST_SQL + " OR " + TYPE + " IS NULL"; + static final String RENAME_SQL = "UPDATE " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " SET " + TABLE_NAMESPACE - + " = ? , " + + " = ?, " + TABLE_NAME - + " = ? " + + " = ?" + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME - + " = ? "; - static final String DROP_TABLE_SQL = + + " = ? AND " + + TYPE + + " = ?"; + static final String RENAME_TABLE_SQL = RENAME_SQL + " OR " + TYPE + " IS NULL"; + static final String DROP_SQL = "DELETE FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE - + " = ? AND " + + " = ? AND " Review Comment: is this whitespace needed here? ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ########## @@ -81,85 +90,103 @@ final class JdbcUtil { + TABLE_NAME + ")" + ")"; - static final String GET_TABLE_SQL = + static final String UPDATE_CATALOG_SQL = + "ALTER TABLE " + CATALOG_TABLE_VIEW_NAME + " ADD COLUMN " + TYPE + " VARCHAR(5)"; + + private static final String GET_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME - + " = ? "; - static final String LIST_TABLES_SQL = + + " = ? AND " + + TYPE + + " = ?"; + private static final String GET_TABLE_SQL = + GET_SQL Review Comment: maybe it would be better to have the full query here with `... AND (TYPE = ? OR TYPE IS NULL)...` so that it's clearer? ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java: ########## @@ -81,85 +90,103 @@ final class JdbcUtil { + TABLE_NAME + ")" + ")"; - static final String GET_TABLE_SQL = + static final String UPDATE_CATALOG_SQL = + "ALTER TABLE " + CATALOG_TABLE_VIEW_NAME + " ADD COLUMN " + TYPE + " VARCHAR(5)"; + + private static final String GET_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME - + " = ? "; - static final String LIST_TABLES_SQL = + + " = ? AND " + + TYPE + + " = ?"; + private static final String GET_TABLE_SQL = + GET_SQL + + " OR " + + TYPE + + " IS NULL"; // type is null when the SQL database has been updated from previous version + static final String LIST_SQL = "SELECT * FROM " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + + " = ? AND " + + TYPE + " = ?"; - static final String RENAME_TABLE_SQL = + static final String LIST_TABLE_SQL = LIST_SQL + " OR " + TYPE + " IS NULL"; + static final String RENAME_SQL = "UPDATE " - + CATALOG_TABLE_NAME + + CATALOG_TABLE_VIEW_NAME + " SET " + TABLE_NAMESPACE - + " = ? , " + + " = ?, " + TABLE_NAME - + " = ? " + + " = ?" + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME - + " = ? "; - static final String DROP_TABLE_SQL = + + " = ? AND " + + TYPE + + " = ?"; + static final String RENAME_TABLE_SQL = RENAME_SQL + " OR " + TYPE + " IS NULL"; Review Comment: same as above, I wonder if it wouldn't be better to have the full query here with `AND (... TYPE = ? OR TYPE IS NULL)` ########## core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java: ########## @@ -18,14 +18,116 @@ */ package org.apache.iceberg.jdbc; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.util.Map; import java.util.Properties; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.sqlite.SQLiteDataSource; public class TestJdbcUtil { + @Test + public void testUpdate() throws Exception { + SQLiteDataSource dataSource = new SQLiteDataSource(); + dataSource.setUrl("jdbc:sqlite:file::memory:?icebergDB"); + + try (Connection connection = dataSource.getConnection()) { + // create "old style" SQL schema + connection + .prepareStatement( + "CREATE TABLE " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + " VARCHAR(255) NOT NULL," + + JdbcUtil.TABLE_NAMESPACE + + " VARCHAR(255) NOT NULL," + + JdbcUtil.TABLE_NAME + + " VARCHAR(255) NOT NULL," + + JdbcTableOperations.METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + "PRIMARY KEY(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "))") + .executeUpdate(); + connection + .prepareStatement( + "INSERT INTO " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "," + + JdbcTableOperations.METADATA_LOCATION_PROP + + "," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + ") VALUES('TEST','namespace1','table1',null,null)") + .execute(); + connection + .prepareStatement( + "INSERT INTO " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "," + + JdbcTableOperations.METADATA_LOCATION_PROP + + "," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + ") VALUES('TEST','namespace2','table2',null,null)") + .execute(); + + // update the schema + connection.prepareStatement(JdbcUtil.UPDATE_CATALOG_SQL).execute(); + + // we check if the type column has been added + DatabaseMetaData dbMeta = connection.getMetaData(); + ResultSet columns = + dbMeta.getColumns(null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, JdbcUtil.TYPE); + Assertions.assertThat(columns.next()).isTrue(); + + // we check the JDBC catalog supports schema from previous version to list tables + ResultSet tablesRS = connection.prepareStatement(JdbcUtil.LIST_TABLE_SQL).executeQuery(); + Map<String, String> tables = Maps.newHashMap(); + while (tablesRS.next()) { + tables.put( + tablesRS.getString(JdbcUtil.TABLE_NAME), tablesRS.getString(JdbcUtil.TABLE_NAMESPACE)); + } + + Assertions.assertThat(tables.size()).isEqualTo(2); Review Comment: in fact you can combine all three assertions into one: `assertThat(tables).hassize(2).containsEntry("namespace1", "table1").containsEntry("namespace2", "table2")` ########## core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java: ########## @@ -182,18 +168,16 @@ private void createTable(String newMetadataLocation) throws SQLException, Interr tableIdentifier, catalogName, namespace); } + if (JdbcUtil.tableExists(catalogName, connections, tableIdentifier)) { + throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); + } Review Comment: also we'd typically check if a view exists before checking if a table exists here (at least that's what we do in other catalog implementations) ########## core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java: ########## @@ -18,14 +18,116 @@ */ package org.apache.iceberg.jdbc; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.util.Map; import java.util.Properties; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.sqlite.SQLiteDataSource; public class TestJdbcUtil { + @Test + public void testUpdate() throws Exception { + SQLiteDataSource dataSource = new SQLiteDataSource(); + dataSource.setUrl("jdbc:sqlite:file::memory:?icebergDB"); + + try (Connection connection = dataSource.getConnection()) { + // create "old style" SQL schema + connection + .prepareStatement( + "CREATE TABLE " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + " VARCHAR(255) NOT NULL," + + JdbcUtil.TABLE_NAMESPACE + + " VARCHAR(255) NOT NULL," + + JdbcUtil.TABLE_NAME + + " VARCHAR(255) NOT NULL," + + JdbcTableOperations.METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + "PRIMARY KEY(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "))") + .executeUpdate(); + connection + .prepareStatement( + "INSERT INTO " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "," + + JdbcTableOperations.METADATA_LOCATION_PROP + + "," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + ") VALUES('TEST','namespace1','table1',null,null)") + .execute(); + connection + .prepareStatement( + "INSERT INTO " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "," + + JdbcTableOperations.METADATA_LOCATION_PROP + + "," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + ") VALUES('TEST','namespace2','table2',null,null)") + .execute(); + + // update the schema + connection.prepareStatement(JdbcUtil.UPDATE_CATALOG_SQL).execute(); + + // we check if the type column has been added + DatabaseMetaData dbMeta = connection.getMetaData(); + ResultSet columns = + dbMeta.getColumns(null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, JdbcUtil.TYPE); + Assertions.assertThat(columns.next()).isTrue(); + + // we check the JDBC catalog supports schema from previous version to list tables + ResultSet tablesRS = connection.prepareStatement(JdbcUtil.LIST_TABLE_SQL).executeQuery(); + Map<String, String> tables = Maps.newHashMap(); + while (tablesRS.next()) { + tables.put( + tablesRS.getString(JdbcUtil.TABLE_NAME), tablesRS.getString(JdbcUtil.TABLE_NAMESPACE)); + } + + Assertions.assertThat(tables.size()).isEqualTo(2); Review Comment: ```suggestion assertThat(tables).hasSize(2); ``` ########## core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java: ########## @@ -18,14 +18,116 @@ */ package org.apache.iceberg.jdbc; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.util.Map; import java.util.Properties; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.sqlite.SQLiteDataSource; public class TestJdbcUtil { + @Test + public void testUpdate() throws Exception { + SQLiteDataSource dataSource = new SQLiteDataSource(); + dataSource.setUrl("jdbc:sqlite:file::memory:?icebergDB"); + + try (Connection connection = dataSource.getConnection()) { + // create "old style" SQL schema + connection + .prepareStatement( + "CREATE TABLE " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + " VARCHAR(255) NOT NULL," + + JdbcUtil.TABLE_NAMESPACE + + " VARCHAR(255) NOT NULL," + + JdbcUtil.TABLE_NAME + + " VARCHAR(255) NOT NULL," + + JdbcTableOperations.METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + "PRIMARY KEY(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "))") + .executeUpdate(); + connection + .prepareStatement( + "INSERT INTO " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "," + + JdbcTableOperations.METADATA_LOCATION_PROP + + "," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + ") VALUES('TEST','namespace1','table1',null,null)") + .execute(); + connection + .prepareStatement( + "INSERT INTO " + + JdbcUtil.CATALOG_TABLE_VIEW_NAME + + "(" + + JdbcUtil.CATALOG_NAME + + "," + + JdbcUtil.TABLE_NAMESPACE + + "," + + JdbcUtil.TABLE_NAME + + "," + + JdbcTableOperations.METADATA_LOCATION_PROP + + "," + + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP + + ") VALUES('TEST','namespace2','table2',null,null)") + .execute(); + + // update the schema + connection.prepareStatement(JdbcUtil.UPDATE_CATALOG_SQL).execute(); + + // we check if the type column has been added + DatabaseMetaData dbMeta = connection.getMetaData(); + ResultSet columns = + dbMeta.getColumns(null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, JdbcUtil.TYPE); + Assertions.assertThat(columns.next()).isTrue(); Review Comment: ```suggestion assertThat(columns.next()).isTrue(); ``` ########## core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java: ########## @@ -18,14 +18,116 @@ */ package org.apache.iceberg.jdbc; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.util.Map; import java.util.Properties; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.sqlite.SQLiteDataSource; public class TestJdbcUtil { + @Test + public void testUpdate() throws Exception { + SQLiteDataSource dataSource = new SQLiteDataSource(); + dataSource.setUrl("jdbc:sqlite:file::memory:?icebergDB"); Review Comment: I think we should have a test where the "old" schema with data is prepared (like you do here) and then we'd init the catalog pointing to that uri and do list tables/views operations to check how the catalog behaves after an upgrade -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org