gh-yzou commented on code in PR #1303:
URL: https://github.com/apache/polaris/pull/1303#discussion_r2038659389
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -42,42 +46,98 @@
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
public class SparkCatalog
implements StagingTableCatalog,
TableCatalog,
SupportsNamespaces,
ViewCatalog,
SupportsReplaceView {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
- private static final Set<String> DEFAULT_NS_KEYS =
ImmutableSet.of(TableCatalog.PROP_OWNER);
- private String catalogName = null;
- private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
+ protected String catalogName = null;
+ protected org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
+ protected PolarisSparkCatalog polarisSparkCatalog = null;
Review Comment:
added
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.rest.Endpoint;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
Review Comment:
added
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.rest.Endpoint;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+ public static final String REST_PAGE_SIZE = "rest-page-size";
+
+ private final Function<Map<String, String>, RESTClient> clientBuilder;
+
+ private RESTClient restClient = null;
+ private CloseableGroup closeables = null;
+ private Set<Endpoint> endpoints;
+ private OAuth2Util.AuthSession catalogAuth = null;
+ private PolarisResourcePaths paths = null;
Review Comment:
updated the name to pathGenerator
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -42,42 +46,98 @@
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
public class SparkCatalog
implements StagingTableCatalog,
TableCatalog,
SupportsNamespaces,
ViewCatalog,
SupportsReplaceView {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
- private static final Set<String> DEFAULT_NS_KEYS =
ImmutableSet.of(TableCatalog.PROP_OWNER);
- private String catalogName = null;
- private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
+ protected String catalogName = null;
+ protected org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
+ protected PolarisSparkCatalog polarisSparkCatalog = null;
- // TODO: Add Polaris Specific REST Catalog
+ protected DeltaHelper deltaHelper = null;
@Override
public String name() {
return catalogName;
}
+ /**
+ * Initialize REST Catalog for Iceberg and Polaris, this is the only catalog
type supported by
+ * Polaris at this moment.
+ */
+ private void initRESTCatalog(String name, CaseInsensitiveStringMap options) {
+ // TODO: relax this in the future
+ String catalogType =
+ PropertyUtil.propertyAsString(
+ options, CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+ if (!catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST)) {
+ throw new UnsupportedOperationException(
+ "Only rest catalog type is supported, but got catalog type: " +
catalogType);
+ }
+
+ String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+ if (catalogImpl != null) {
+ throw new UnsupportedOperationException(
+ "Customized catalog implementation is currently not supported!");
Review Comment:
updated the message to
```
"Customized catalog implementation is not supported and not needed, please
remove the configuration!"
```
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.rest.Endpoint;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+ public static final String REST_PAGE_SIZE = "rest-page-size";
+
+ private final Function<Map<String, String>, RESTClient> clientBuilder;
+
+ private RESTClient restClient = null;
+ private CloseableGroup closeables = null;
+ private Set<Endpoint> endpoints;
+ private OAuth2Util.AuthSession catalogAuth = null;
+ private PolarisResourcePaths paths = null;
+ private Integer pageSize = null;
+
+ // the default endpoints to config if server doesn't specify the 'endpoints'
configuration.
+ private static final Set<Endpoint> DEFAULT_ENDPOINTS =
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
+
+ public PolarisRESTCatalog() {
+ this(config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
+ }
+
+ public PolarisRESTCatalog(Function<Map<String, String>, RESTClient>
clientBuilder) {
+ this.clientBuilder = clientBuilder;
+ }
+
+ public void initialize(Map<String, String> unresolved,
OAuth2Util.AuthSession catalogAuth) {
+ Preconditions.checkArgument(unresolved != null, "Invalid configuration:
null");
+
+ // resolve any configuration that is supplied by environment variables
+ Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);
+
+ // TODO: switch to use authManager once iceberg dependency is updated to
1.9.0
+ this.catalogAuth = catalogAuth;
+
+ ConfigResponse config;
+ try (RESTClient initClient =
clientBuilder.apply(props).withAuthSession(catalogAuth)) {
+ config = fetchConfig(initClient, catalogAuth.headers(), props);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close HTTP client", e);
+ }
+
+ // call getConfig to get the server configurations
+ Map<String, String> mergedProps = config.merge(props);
+ if (config.endpoints().isEmpty()) {
+ this.endpoints = DEFAULT_ENDPOINTS;
+ } else {
+ this.endpoints = ImmutableSet.copyOf(config.endpoints());
+ }
+
+ this.paths = PolarisResourcePaths.forCatalogProperties(mergedProps);
+ this.restClient =
clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
+
+ this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps,
REST_PAGE_SIZE);
+ if (pageSize != null) {
+ Preconditions.checkArgument(
+ pageSize > 0, "Invalid value for %s, must be a positive integer",
REST_PAGE_SIZE);
+ }
Review Comment:
sg! removed for now
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.rest.Endpoint;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+ public static final String REST_PAGE_SIZE = "rest-page-size";
+
+ private final Function<Map<String, String>, RESTClient> clientBuilder;
+
+ private RESTClient restClient = null;
+ private CloseableGroup closeables = null;
+ private Set<Endpoint> endpoints;
+ private OAuth2Util.AuthSession catalogAuth = null;
+ private PolarisResourcePaths paths = null;
+ private Integer pageSize = null;
+
+ // the default endpoints to config if server doesn't specify the 'endpoints'
configuration.
+ private static final Set<Endpoint> DEFAULT_ENDPOINTS =
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
+
+ public PolarisRESTCatalog() {
+ this(config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
+ }
+
+ public PolarisRESTCatalog(Function<Map<String, String>, RESTClient>
clientBuilder) {
+ this.clientBuilder = clientBuilder;
+ }
+
+ public void initialize(Map<String, String> unresolved,
OAuth2Util.AuthSession catalogAuth) {
+ Preconditions.checkArgument(unresolved != null, "Invalid configuration:
null");
+
+ // resolve any configuration that is supplied by environment variables
Review Comment:
added
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.rest.Endpoint;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+ public static final String REST_PAGE_SIZE = "rest-page-size";
+
+ private final Function<Map<String, String>, RESTClient> clientBuilder;
+
+ private RESTClient restClient = null;
+ private CloseableGroup closeables = null;
+ private Set<Endpoint> endpoints;
+ private OAuth2Util.AuthSession catalogAuth = null;
+ private PolarisResourcePaths paths = null;
+ private Integer pageSize = null;
+
+ // the default endpoints to config if server doesn't specify the 'endpoints'
configuration.
+ private static final Set<Endpoint> DEFAULT_ENDPOINTS =
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
+
+ public PolarisRESTCatalog() {
+ this(config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
+ }
+
+ public PolarisRESTCatalog(Function<Map<String, String>, RESTClient>
clientBuilder) {
+ this.clientBuilder = clientBuilder;
+ }
+
+ public void initialize(Map<String, String> unresolved,
OAuth2Util.AuthSession catalogAuth) {
+ Preconditions.checkArgument(unresolved != null, "Invalid configuration:
null");
+
+ // resolve any configuration that is supplied by environment variables
+ Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);
+
+ // TODO: switch to use authManager once iceberg dependency is updated to
1.9.0
+ this.catalogAuth = catalogAuth;
+
+ ConfigResponse config;
+ try (RESTClient initClient =
clientBuilder.apply(props).withAuthSession(catalogAuth)) {
+ config = fetchConfig(initClient, catalogAuth.headers(), props);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close HTTP client", e);
+ }
+
+ // call getConfig to get the server configurations
+ Map<String, String> mergedProps = config.merge(props);
+ if (config.endpoints().isEmpty()) {
+ this.endpoints = DEFAULT_ENDPOINTS;
+ } else {
+ this.endpoints = ImmutableSet.copyOf(config.endpoints());
+ }
+
+ this.paths = PolarisResourcePaths.forCatalogProperties(mergedProps);
+ this.restClient =
clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
+
+ this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps,
REST_PAGE_SIZE);
+ if (pageSize != null) {
+ Preconditions.checkArgument(
+ pageSize > 0, "Invalid value for %s, must be a positive integer",
REST_PAGE_SIZE);
+ }
+
+ this.closeables = new CloseableGroup();
+ this.closeables.addCloseable(this.restClient);
+ this.closeables.setSuppressCloseFailure(true);
+ }
+
+ protected static ConfigResponse fetchConfig(
+ RESTClient client, Map<String, String> headers, Map<String, String>
properties) {
+ // send the client's warehouse location to the service to keep in sync
+ // this is needed for cases where the warehouse is configured at client
side,
+ // and used by Polaris server as catalog name.
+ ImmutableMap.Builder<String, String> queryParams = ImmutableMap.builder();
+ if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+ queryParams.put(
+ CatalogProperties.WAREHOUSE_LOCATION,
+ properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+ }
+
+ ConfigResponse configResponse =
+ client.get(
+ ResourcePaths.config(),
+ queryParams.build(),
+ ConfigResponse.class,
+ headers,
+ ErrorHandlers.defaultErrorHandler());
+ configResponse.validate();
+ return configResponse;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closeables != null) {
+ closeables.close();
+ }
+ }
+
+ @Override
+ public List<TableIdentifier> listGenericTables(Namespace ns) {
+ throw new UnsupportedOperationException("listTables not supported");
+ }
+
+ @Override
+ public boolean dropGenericTable(TableIdentifier identifier) {
+ throw new UnsupportedOperationException("dropTable not supported");
+ }
+
+ @Override
+ public GenericTable createGenericTable(
+ TableIdentifier identifier, String format, String doc, Map<String,
String> props) {
+ Endpoint.check(endpoints, PolarisEndpoints.V1_CREATE_GENERIC_TABLE);
+ CreateGenericTableRESTRequest request =
+ new CreateGenericTableRESTRequest(identifier.name(), format, doc,
props);
+
+ LoadGenericTableRESTResponse response =
+ restClient
+ .withAuthSession(this.catalogAuth)
+ .post(
+ paths.genericTables(identifier.namespace()),
+ request,
+ LoadGenericTableRESTResponse.class,
+ Map.of(),
+ ErrorHandlers.tableErrorHandler());
+
+ return response.getTable();
+ }
+
+ @Override
+ public GenericTable loadGenericTable(TableIdentifier identifier) {
+ Endpoint.check(endpoints, PolarisEndpoints.V1_LOAD_GENERIC_TABLE);
+ PolarisCatalogUtils.checkIdentifierIsValid(identifier);
Review Comment:
removed
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -42,42 +46,98 @@
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
public class SparkCatalog
implements StagingTableCatalog,
TableCatalog,
SupportsNamespaces,
ViewCatalog,
SupportsReplaceView {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
- private static final Set<String> DEFAULT_NS_KEYS =
ImmutableSet.of(TableCatalog.PROP_OWNER);
- private String catalogName = null;
- private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
+ protected String catalogName = null;
+ protected org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
+ protected PolarisSparkCatalog polarisSparkCatalog = null;
- // TODO: Add Polaris Specific REST Catalog
+ protected DeltaHelper deltaHelper = null;
@Override
public String name() {
return catalogName;
}
+ /**
+ * Initialize REST Catalog for Iceberg and Polaris, this is the only catalog
type supported by
+ * Polaris at this moment.
+ */
+ private void initRESTCatalog(String name, CaseInsensitiveStringMap options) {
+ // TODO: relax this in the future
+ String catalogType =
+ PropertyUtil.propertyAsString(
+ options, CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+ if (!catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST)) {
+ throw new UnsupportedOperationException(
+ "Only rest catalog type is supported, but got catalog type: " +
catalogType);
+ }
Review Comment:
updated the check to allow no type configuration and rest config.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]