This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new dc28f65a3d0 [pick-2.0][Enhancement](jdbc catalog) Add a property to test the connection when creating a Jdbc catalog (#32581) dc28f65a3d0 is described below commit dc28f65a3d0cc80834b58534155dfb460a4d25fe Author: zy-kkk <zhongy...@gmail.com> AuthorDate: Thu Mar 21 14:05:50 2024 +0800 [pick-2.0][Enhancement](jdbc catalog) Add a property to test the connection when creating a Jdbc catalog (#32581) --- be/src/service/internal_service.cpp | 59 +++++++++++ be/src/service/internal_service.h | 5 + be/src/vec/exec/vjdbc_connector.cpp | 27 ++++- be/src/vec/exec/vjdbc_connector.h | 5 + .../java/org/apache/doris/jdbc/JdbcExecutor.java | 21 ++++ .../org/apache/doris/catalog/JdbcResource.java | 9 +- .../apache/doris/datasource/CatalogFactory.java | 2 +- .../doris/datasource/jdbc/JdbcExternalCatalog.java | 111 ++++++++++++++++++++- .../doris/datasource/jdbc/client/JdbcClient.java | 18 ++++ .../datasource/jdbc/client/JdbcOracleClient.java | 5 + .../datasource/jdbc/client/JdbcSapHanaClient.java | 5 + .../org/apache/doris/rpc/BackendServiceClient.java | 5 + .../org/apache/doris/rpc/BackendServiceProxy.java | 12 +++ .../datasource/jdbc/JdbcExternalCatalogTest.java | 2 +- gensrc/proto/internal_service.proto | 11 ++ .../jdbc/test_clickhouse_jdbc_catalog.out | Bin 5002 -> 5437 bytes .../jdbc/test_clickhouse_jdbc_catalog.groovy | 30 +++++- 17 files changed, 320 insertions(+), 7 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index acc2e29a3ef..2fd4310c4bc 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -37,6 +37,7 @@ #include <stddef.h> #include <stdint.h> #include <sys/stat.h> +#include <vec/exec/vjdbc_connector.h> #include <algorithm> #include <exception> @@ -683,6 +684,64 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co } } +void PInternalServiceImpl::test_jdbc_connection(google::protobuf::RpcController* controller, + const PJdbcTestConnectionRequest* request, + PJdbcTestConnectionResult* result, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, result, done]() { + VLOG_RPC << "test jdbc connection"; + brpc::ClosureGuard closure_guard(done); + TTableDescriptor table_desc; + vectorized::JdbcConnectorParam jdbc_param; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)request->jdbc_table().data(); + uint32_t len = request->jdbc_table().size(); + st = deserialize_thrift_msg(buf, &len, false, &table_desc); + if (!st.ok()) { + LOG(WARNING) << "test jdbc connection failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + TJdbcTable jdbc_table = (table_desc.jdbcTable); + jdbc_param.catalog_id = jdbc_table.catalog_id; + jdbc_param.driver_class = jdbc_table.jdbc_driver_class; + jdbc_param.driver_path = jdbc_table.jdbc_driver_url; + jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum; + jdbc_param.jdbc_url = jdbc_table.jdbc_url; + jdbc_param.user = jdbc_table.jdbc_user; + jdbc_param.passwd = jdbc_table.jdbc_password; + jdbc_param.query_string = request->query_str(); + jdbc_param.table_type = static_cast<TOdbcTableType::type>(request->jdbc_table_type()); + jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size; + jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size; + jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time; + jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time; + jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive; + + std::unique_ptr<vectorized::JdbcConnector> jdbc_connector; + jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param)); + + st = jdbc_connector->test_connection(); + st.to_protobuf(result->mutable_status()); + + Status clean_st = jdbc_connector->clean_datasource(); + if (!clean_st.ok()) { + LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg(); + } + Status close_st = jdbc_connector->close(); + if (!close_st.ok()) { + LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg(); + } + }); + + if (!ret) { + offer_failed(result, done, _heavy_work_pool); + return; + } +} + void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, const PFetchColIdsRequest* request, PFetchColIdsResponse* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 47762cf7e52..259a41c2e33 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -184,6 +184,11 @@ public: void glob(google::protobuf::RpcController* controller, const PGlobRequest* request, PGlobResponse* response, google::protobuf::Closure* done) override; + void test_jdbc_connection(google::protobuf::RpcController* controller, + const PJdbcTestConnectionRequest* request, + PJdbcTestConnectionResult* result, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 9b98627db94..9344faad01d 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -156,7 +156,11 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { ctor_params.__set_jdbc_password(_conn_param.passwd); ctor_params.__set_jdbc_driver_class(_conn_param.driver_class); ctor_params.__set_driver_path(local_location); - ctor_params.__set_batch_size(read ? state->batch_size() : 0); + if (state == nullptr) { + ctor_params.__set_batch_size(read ? 1 : 0); + } else { + ctor_params.__set_batch_size(read ? state->batch_size() : 0); + } ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE); ctor_params.__set_table_type(_conn_param.table_type); ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size); @@ -185,6 +189,23 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { return Status::OK(); } +Status JdbcConnector::test_connection() { + RETURN_IF_ERROR(open(nullptr, true)); + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_test_connection_id); + return JniUtil::GetJniExceptionMsg(env); +} + +Status JdbcConnector::clean_datasource() { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_clean_datasource_id); + return JniUtil::GetJniExceptionMsg(env); +} + Status JdbcConnector::query() { if (!_is_open) { return Status::InternalError("Query before open of JdbcConnector."); @@ -840,6 +861,10 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames", JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "testConnection", "()V", _executor_test_connection_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "cleanDataSource", "()V", _executor_clean_datasource_id)); return Status::OK(); } diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 55b39f0aec2..32dc5aba47b 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -103,6 +103,9 @@ public: Status close() override; + Status test_connection(); + Status clean_datasource(); + private: Status _register_func_id(JNIEnv* env); Status _check_column_type(); @@ -165,6 +168,8 @@ private: jmethodID _executor_begin_trans_id; jmethodID _executor_finish_trans_id; jmethodID _executor_abort_trans_id; + jmethodID _executor_test_connection_id; + jmethodID _executor_clean_datasource_id; std::map<int, int> _map_column_idx_to_cast_idx; std::vector<DataTypePtr> _input_array_string_types; std::vector<MutableColumnPtr> diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java index 3011c5d82f7..6f15600eddc 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java @@ -180,6 +180,27 @@ public class JdbcExecutor { return false; } + + public void cleanDataSource() { + if (druidDataSource != null) { + druidDataSource.close(); + JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); + druidDataSource = null; + } + } + + public void testConnection() throws UdfRuntimeException { + try { + resultSet = ((PreparedStatement) stmt).executeQuery(); + if (!resultSet.next()) { + throw new UdfRuntimeException( + "Failed to test connection in BE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new UdfRuntimeException("Failed to test connection in BE: ", e); + } + } + public int read() throws UdfRuntimeException { try { resultSet = ((PreparedStatement) stmt).executeQuery(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 3291b64cbb0..1faf27e1040 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -104,6 +104,8 @@ public class JdbcResource extends Resource { public static final String CONNECTION_POOL_KEEP_ALIVE = "connection_pool_keep_alive"; public static final String CHECK_SUM = "checksum"; public static final String CREATE_TIME = "create_time"; + public static final String TEST_CONNECTION = "test_connection"; + private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add( JDBC_URL, USER, @@ -120,7 +122,8 @@ public class JdbcResource extends Resource { CONNECTION_POOL_MAX_SIZE, CONNECTION_POOL_MAX_LIFE_TIME, CONNECTION_POOL_MAX_WAIT_TIME, - CONNECTION_POOL_KEEP_ALIVE + CONNECTION_POOL_KEEP_ALIVE, + TEST_CONNECTION ).build(); private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add( ONLY_SPECIFIED_DATABASE, @@ -131,7 +134,8 @@ public class JdbcResource extends Resource { CONNECTION_POOL_MAX_SIZE, CONNECTION_POOL_MAX_LIFE_TIME, CONNECTION_POOL_MAX_WAIT_TIME, - CONNECTION_POOL_KEEP_ALIVE + CONNECTION_POOL_KEEP_ALIVE, + TEST_CONNECTION ).build(); // The default value of optional properties @@ -148,6 +152,7 @@ public class JdbcResource extends Resource { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "1800000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true"); } // timeout for both connection and read. 10 seconds is long enough. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java index 09ad69ec8b6..3bf674a8abd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java @@ -119,7 +119,7 @@ public class CatalogFactory { catalog = new EsExternalCatalog(catalogId, name, resource, props, comment); break; case "jdbc": - catalog = new JdbcExternalCatalog(catalogId, name, resource, props, comment); + catalog = new JdbcExternalCatalog(catalogId, name, resource, props, comment, isReplay); break; case "iceberg": catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props, comment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index ea9d5d74999..774ccd2dd25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -17,10 +17,14 @@ package org.apache.doris.datasource.jdbc; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.external.JdbcExternalDatabase; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; @@ -28,19 +32,37 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; +import org.apache.doris.datasource.jdbc.client.JdbcClientException; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest; +import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; @Getter public class JdbcExternalCatalog extends ExternalCatalog { + private static final Logger LOG = LogManager.getLogger(JdbcExternalCatalog.class); + private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of( JdbcResource.JDBC_URL, JdbcResource.DRIVER_URL, @@ -52,10 +74,11 @@ public class JdbcExternalCatalog extends ExternalCatalog { private transient JdbcClient jdbcClient; public JdbcExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, - String comment) + String comment, boolean isReplay) throws DdlException { super(catalogId, name, InitCatalogLog.Type.JDBC, comment); this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props)); + testJdbcConnection(isReplay); } @Override @@ -73,6 +96,9 @@ public class JdbcExternalCatalog extends ExternalCatalog { JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, getOnlySpecifiedDatabase()); JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_TABLE_NAMES, getLowerCaseTableNames()); + JdbcResource.checkBooleanProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE, + String.valueOf(isConnectionPoolKeepAlive())); + JdbcResource.checkBooleanProperty(JdbcResource.TEST_CONNECTION, String.valueOf(isTestConnection())); JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), getIncludeDatabaseMap(), getExcludeDatabaseMap()); JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), getConnectionPoolMaxSize(), @@ -177,6 +203,11 @@ public class JdbcExternalCatalog extends ExternalCatalog { .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE))); } + public boolean isTestConnection() { + return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.TEST_CONNECTION, JdbcResource + .getDefaultPropertyValue(JdbcResource.TEST_CONNECTION))); + } + @Override protected void initLocalObjectsImpl() { JdbcClientConfig jdbcClientConfig = new JdbcClientConfig() @@ -244,4 +275,82 @@ public class JdbcExternalCatalog extends ExternalCatalog { } } } + + private void testJdbcConnection(boolean isReplay) throws DdlException { + if (FeConstants.runningUnitTest) { + // skip test connection in unit test + return; + } + if (!isReplay) { + if (isTestConnection()) { + try { + initLocalObjectsImpl(); + testFeToJdbcConnection(); + testBeToJdbcConnection(); + } finally { + jdbcClient.closeClient(); + jdbcClient = null; + } + } + } + } + + private void testFeToJdbcConnection() throws DdlException { + try { + jdbcClient.testConnection(); + } catch (JdbcClientException e) { + String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage(); + LOG.error(errorMessage, e); + throw new DdlException(errorMessage, e); + } + } + + private void testBeToJdbcConnection() throws DdlException { + Backend aliveBe = null; + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + aliveBe = be; + } + } + if (aliveBe == null) { + throw new DdlException("Test BE Connection to JDBC Failed: No Alive backends"); + } + TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort()); + try { + JdbcTable jdbcTable = getTestConnectionJdbcTable(); + PJdbcTestConnectionRequest request = InternalService.PJdbcTestConnectionRequest.newBuilder() + .setJdbcTable(ByteString.copyFrom(new TSerializer().serialize(jdbcTable.toThrift()))) + .setJdbcTableType(jdbcTable.getJdbcTableType().getValue()) + .setQueryStr(jdbcClient.getTestQuery()).build(); + InternalService.PJdbcTestConnectionResult result = null; + Future<PJdbcTestConnectionResult> future = BackendServiceProxy.getInstance() + .testJdbcConnection(address, request); + result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new DdlException("Test BE Connection to JDBC Failed: " + result.getStatus().getErrorMsgs(0)); + } + } catch (TException | RpcException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private JdbcTable getTestConnectionJdbcTable() throws DdlException { + JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection", Lists.newArrayList(), + TableType.JDBC_EXTERNAL_TABLE); + jdbcTable.setCatalogId(this.getId()); + jdbcTable.setJdbcTypeName(this.getDatabaseTypeName()); + jdbcTable.setJdbcUrl(this.getJdbcUrl()); + jdbcTable.setJdbcUser(this.getJdbcUser()); + jdbcTable.setJdbcPasswd(this.getJdbcPasswd()); + jdbcTable.setDriverClass(this.getDriverClass()); + jdbcTable.setDriverUrl(this.getDriverUrl()); + jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl())); + jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize()); + jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize()); + jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime()); + jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime()); + jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive()); + return jdbcTable; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 7840e4cf0ae..1db27ef6eaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -478,4 +478,22 @@ public abstract class JdbcClient { } return ScalarType.createStringType(); } + + public void testConnection() { + String testQuery = getTestQuery(); + try (Connection conn = getConnection(); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(testQuery)) { + if (!rs.next()) { + throw new JdbcClientException( + "Failed to test connection in FE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new JdbcClientException("Failed to test connection in FE: " + e.getMessage(), e); + } + } + + public String getTestQuery() { + return "select 1"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java index d0a9f2c3de7..37fd1b6c72c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java @@ -45,6 +45,11 @@ public class JdbcOracleClient extends JdbcClient { return conn.getCatalog(); } + @Override + public String getTestQuery() { + return "SELECT 1 FROM dual"; + } + @Override public List<String> getDatabaseNameList() { Connection conn = getConnection(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java index eb8742c6e80..2df36b4cbaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java @@ -36,6 +36,11 @@ public class JdbcSapHanaClient extends JdbcClient { return new String[] {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"}; } + @Override + public String getTestQuery() { + return "SELECT 1 FROM DUMMY"; + } + @Override protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { String hanaType = fieldSchema.getDataTypeName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index e4ece51146b..87321efb85b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -114,6 +114,11 @@ public class BackendServiceClient { return stub.fetchTableSchema(request); } + public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection( + InternalService.PJdbcTestConnectionRequest request) { + return stub.testJdbcConnection(request); + } + public Future<InternalService.PCacheResponse> updateCache(InternalService.PUpdateCacheRequest request) { return stub.updateCache(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 9b5c491df69..72afa75ffcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -284,6 +284,18 @@ public class BackendServiceProxy { } } + public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection( + TNetworkAddress address, InternalService.PJdbcTestConnectionRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.testJdbcConnection(request); + } catch (Throwable e) { + LOG.warn("test jdbc connection catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future<InternalService.PCacheResponse> updateCache( TNetworkAddress address, InternalService.PUpdateCacheRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java index 8394daf0682..4ef950ef981 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java @@ -41,7 +41,7 @@ public class JdbcExternalCatalogTest { properties.put(JdbcResource.DRIVER_URL, "ojdbc8.jar"); properties.put(JdbcResource.JDBC_URL, "jdbc:oracle:thin:@127.0.0.1:1521:XE"); properties.put(JdbcResource.DRIVER_CLASS, "oracle.jdbc.driver.OracleDriver"); - jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null, properties, "testComment"); + jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null, properties, "testComment", false); } @Test diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1ef97ec2df3..a1fd0e42a70 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -614,6 +614,16 @@ message PFetchTableSchemaResult { repeated PTypeDesc column_types = 4; } +message PJdbcTestConnectionRequest { + optional bytes jdbc_table = 1; + optional int32 jdbc_table_type = 2; + optional string query_str = 3; +} + +message PJdbcTestConnectionResult { + optional PStatus status = 1; +} + message PRowLocation { optional int64 tablet_id = 1; optional string rowset_id = 2; @@ -727,5 +737,6 @@ service PBackendService { rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns (PFetchColIdsResponse); rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns (PGetTabletVersionsResponse); rpc glob(PGlobRequest) returns (PGlobResponse); + rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); }; diff --git a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out index 32c74765aab..65b53290116 100644 Binary files a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out and b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out differ diff --git a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy index 9948c49d24a..ff89e1b9a30 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy @@ -101,6 +101,32 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex order_qt_dt_with_tz """ select * from dt_with_tz order by id; """ + sql """create catalog if not exists clickhouse_catalog_test_conn_correct properties( + "type"="jdbc", + "user"="default", + "password"="123456", + "jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test", + "driver_url" = "${driver_url}", + "driver_class" = "com.clickhouse.jdbc.ClickHouseDriver", + "test_connection" = "true" + ); + """ + order_qt_test_conn_correct """ select * from clickhouse_catalog_test_conn_correct.doris_test.type; """ + + test { + sql """create catalog if not exists clickhouse_catalog_test_conn_mistake properties( + "type"="jdbc", + "user"="default", + "password"="1234567", + "jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test", + "driver_url" = "${driver_url}", + "driver_class" = "com.clickhouse.jdbc.ClickHouseDriver", + "test_connection" = "true" + ); + """ + exception "Test FE Connection to JDBC Failed: Can not connect to jdbc due to error: Code: 516. DB::Exception: default: Authentication failed: password is incorrect, or there is no user with such name." + } + }finally { res_dbs_log = sql "show databases;" for(int i = 0;i < res_dbs_log.size();i++) { @@ -108,7 +134,9 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex log.info( "database = ${res_dbs_log[i][0]} => tables = "+tbs.toString()) } } - + sql """ drop catalog if exists ${catalog_name} """ + sql """ drop catalog if exists clickhouse_catalog_test_conn_correct """ + sql """ drop catalog if exists clickhouse_catalog_test_conn_mistake """ } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org