This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f029a7fc72d0a7bd5b390355c07b5be52ad6dbf4 Author: daidai <2017501...@qq.com> AuthorDate: Fri Aug 23 18:45:31 2024 +0800 [fix](hive)Modify the Hive notification event processing method when using meta cache and add parameters to the Hive catalog. (#39239) before #15401 and #38244 ## Proposed changes 1. Add the parameter `hive.enable_hms_events_incremental_sync` to the hive catalog, which is used to switch the catalog to read hive notification events (default is false). The default value is `enable_hms_events_incremental_sync` in fe.conf 2. Add the parameter `hive.hms_events_batch_size_per_rpc` to the hive catalog, which is used to set the size of notification events read by the catalog each time. The default value is `hms_events_batch_size_per_rpc` in fe.conf (default is 500) 3. append hms event notification case . 4. Remove the `use_meta_cache` setting in catalog that is forced to true. Example : ``` create catalog if not exists catalog_name properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://externalEnvIp:hms_port', "hive.enable_hms_events_incremental_sync" ="true", "hive.hms_events_batch_size_per_rpc" = "1000" ); ``` --- .../docker-compose/hive/hadoop-hive.env.tpl | 3 + .../main/java/org/apache/doris/catalog/Env.java | 5 +- .../org/apache/doris/datasource/CatalogMgr.java | 14 - .../apache/doris/datasource/ExternalCatalog.java | 10 +- .../apache/doris/datasource/ExternalDatabase.java | 12 +- .../doris/datasource/hive/HMSExternalCatalog.java | 30 +- .../datasource/hive/event/AlterPartitionEvent.java | 4 +- .../doris/datasource/hive/event/IgnoredEvent.java | 2 +- .../doris/datasource/hive/event/InsertEvent.java | 12 - .../datasource/hive/event/MetastoreEvent.java | 12 + .../hive/event/MetastoreEventFactory.java | 3 + .../hive/event/MetastoreEventsProcessor.java | 17 +- .../doris/datasource/metacache/MetaCache.java | 7 +- .../property/constants/HMSProperties.java | 3 +- regression-test/pipeline/external/conf/fe.conf | 2 + .../hive/ddl/test_hive_ctas.groovy | 2 +- .../hive/test_hms_event_notification.groovy | 392 ++++++++++++ ...est_hms_event_notification_multi_catalog.groovy | 676 +++++++++++++++++++++ .../hive/write/test_hive_write_insert.groovy | 2 +- .../hive/write/test_hive_write_partitions.groovy | 2 +- 20 files changed, 1157 insertions(+), 53 deletions(-) diff --git a/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl b/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl index 0e074228410..b7e662f5e52 100644 --- a/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl +++ b/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl @@ -28,6 +28,9 @@ HIVE_SITE_CONF_hive_server2_webui_port=0 HIVE_SITE_CONF_hive_compactor_initiator_on=true HIVE_SITE_CONF_hive_compactor_worker_threads=2 HIVE_SITE_CONF_metastore_storage_schema_reader_impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader +HIVE_SITE_CONF_hive_metastore_event_db_notification_api_auth=false +HIVE_SITE_CONF_hive_metastore_dml_events=true +HIVE_SITE_CONF_hive_metastore_transactional_event_listeners=org.apache.hive.hcatalog.listener.DbNotificationListener CORE_CONF_fs_defaultFS=hdfs://${IP_HOST}:${FS_PORT} CORE_CONF_hadoop_http_staticuser_user=root diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 15eac7fabeb..d5732394f47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1822,9 +1822,8 @@ public class Env { domainResolver.start(); // fe disk updater feDiskUpdater.start(); - if (Config.enable_hms_events_incremental_sync) { - metastoreEventsProcessor.start(); - } + + metastoreEventsProcessor.start(); dnsCache.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 8ed602d0eb8..b1febf202d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -655,13 +655,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - TableIf table = db.getTableNullable(tableName); - if (table != null) { - if (!ignoreIfExists) { - throw new DdlException("Table " + tableName + " has exist in db " + dbName); - } - return; - } long tblId; HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog; if (hmsCatalog.getUseMetaCache().get()) { @@ -712,13 +705,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { if (!(catalog instanceof ExternalCatalog)) { throw new DdlException("Only support create ExternalCatalog databases"); } - DatabaseIf db = catalog.getDbNullable(dbName); - if (db != null) { - if (!ignoreIfExists) { - throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName()); - } - return; - } HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog; long dbId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index d99ac76c7b9..a8170e83777 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -291,11 +291,11 @@ public abstract class ExternalCatalog } } - if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) { - LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name); - getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true"); - useMetaCache = Optional.of(true); - } + // if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) { + // LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name); + // getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true"); + // useMetaCache = Optional.of(true); + // } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index 13db68eced2..e2fe6cdd7a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -451,13 +451,14 @@ public abstract class ExternalDatabase<T extends ExternalTable> @Override public void unregisterTable(String tableName) { + makeSureInitialized(); if (LOG.isDebugEnabled()) { LOG.debug("create table [{}]", tableName); } if (extCatalog.getUseMetaCache().get()) { if (isInitialized()) { - metaCache.invalidate(tableName); + metaCache.invalidate(tableName, Util.genIdByName(getQualifiedName(tableName))); } } else { Long tableId = tableNameToId.remove(tableName); @@ -480,6 +481,7 @@ public abstract class ExternalDatabase<T extends ExternalTable> // Only used for sync hive metastore event @Override public boolean registerTable(TableIf tableIf) { + makeSureInitialized(); long tableId = tableIf.getId(); String tableName = tableIf.getName(); if (LOG.isDebugEnabled()) { @@ -487,11 +489,13 @@ public abstract class ExternalDatabase<T extends ExternalTable> } if (extCatalog.getUseMetaCache().get()) { if (isInitialized()) { - metaCache.updateCache(tableName, (T) tableIf); + metaCache.updateCache(tableName, (T) tableIf, Util.genIdByName(getQualifiedName(tableName))); } } else { - tableNameToId.put(tableName, tableId); - idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog)); + if (!tableNameToId.containsKey(tableName)) { + tableNameToId.put(tableName, tableId); + idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog)); + } } setLastUpdateTime(System.currentTimeMillis()); return true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index be9bf388adb..5faf1f2bb6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -25,6 +25,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopAuthenticator; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -73,6 +74,10 @@ public class HMSExternalCatalog extends ExternalCatalog { @Getter private HadoopAuthenticator authenticator; + private int hmsEventsBatchSizePerRpc = -1; + private boolean enableHmsEventsIncrementalSync = false; + + @VisibleForTesting public HMSExternalCatalog() { catalogProperty = new CatalogProperty(null, null); @@ -100,6 +105,19 @@ public class HMSExternalCatalog extends ExternalCatalog { throw new DdlException( "The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond); } + Map<String, String> properties = catalogProperty.getProperties(); + if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) { + enableHmsEventsIncrementalSync = + properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true"); + } else { + enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync; + } + + if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) { + hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)); + } else { + hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc; + } // check the dfs.ha properties // 'dfs.nameservices'='your-nameservice', @@ -212,7 +230,7 @@ public class HMSExternalCatalog extends ExternalCatalog { } if (useMetaCache.get()) { if (isInitialized()) { - metaCache.invalidate(dbName); + metaCache.invalidate(dbName, Util.genIdByName(getQualifiedName(dbName))); } } else { Long dbId = dbNameToId.remove(dbName); @@ -233,7 +251,7 @@ public class HMSExternalCatalog extends ExternalCatalog { ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, dbId, logType); if (useMetaCache.get()) { if (isInitialized()) { - metaCache.updateCache(dbName, db); + metaCache.updateCache(dbName, db, Util.genIdByName(getQualifiedName(dbName))); } } else { dbNameToId.put(dbName, dbId); @@ -266,4 +284,12 @@ public class HMSExternalCatalog extends ExternalCatalog { public String getHiveVersion() { return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); } + + public int getHmsEventsBatchSizePerRpc() { + return hmsEventsBatchSizePerRpc; + } + + public boolean isEnableHmsEventsIncrementalSync() { + return enableHmsEventsIncrementalSync; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index 6be0215f143..569d9878d7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -145,9 +145,7 @@ public class AlterPartitionEvent extends MetastorePartitionEvent { // `that` event can be batched if this event's partitions contains all of the partitions which `that` event has // else just remove `that` event's relevant partitions for (String partitionName : getAllPartitionNames()) { - if (thatPartitionEvent instanceof AddPartitionEvent) { - ((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName); - } else if (thatPartitionEvent instanceof DropPartitionEvent) { + if (thatPartitionEvent instanceof DropPartitionEvent) { ((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java index d504c2917f9..e7e6643e647 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java @@ -28,7 +28,7 @@ import java.util.List; */ public class IgnoredEvent extends MetastoreEvent { private IgnoredEvent(NotificationEvent event, String catalogName) { - super(event, catalogName); + super(event); } protected static List<MetastoreEvent> getEvents(NotificationEvent event, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java index f793ab8b068..7b76d4913d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java @@ -24,8 +24,6 @@ import org.apache.doris.common.DdlException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import java.util.List; @@ -33,13 +31,11 @@ import java.util.List; * MetastoreEvent for INSERT event type */ public class InsertEvent extends MetastoreTableEvent { - private final Table hmsTbl; // for test public InsertEvent(long eventId, String catalogName, String dbName, String tblName) { super(eventId, catalogName, dbName, tblName, MetastoreEventType.INSERT); - this.hmsTbl = null; } private InsertEvent(NotificationEvent event, String catalogName) { @@ -47,14 +43,6 @@ public class InsertEvent extends MetastoreTableEvent { Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT)); Preconditions .checkNotNull(event.getMessage(), debugString("Event message is null")); - try { - InsertMessage insertMessage = - MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) - .getInsertMessage(event.getMessage()); - hmsTbl = Preconditions.checkNotNull(insertMessage.getTableObj()); - } catch (Exception ex) { - throw new MetastoreNotificationException(ex); - } } protected static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index 04b0ccab799..695dd57b215 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -78,6 +78,18 @@ public abstract class MetastoreEvent { this.event = null; } + // for IgnoredEvent + protected MetastoreEvent(NotificationEvent event) { + this.event = event; + this.metastoreNotificationEvent = event; + this.eventId = -1; + this.eventTime = -1L; + this.catalogName = null; + this.dbName = null; + this.tblName = null; + this.eventType = null; + } + protected MetastoreEvent(NotificationEvent event, String catalogName) { this.event = event; // Some events that we don't care about, dbName may be empty diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index 493f1f7cb71..7f697cf9738 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -46,6 +46,9 @@ public class MetastoreEventFactory implements EventFactory { String catalogName) { Preconditions.checkNotNull(event.getEventType()); MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType()); + if (LOG.isDebugEnabled()) { + LOG.debug("catalogName = {}, Event = {}", catalogName, event.toString()); + } switch (metastoreEventType) { case CREATE_TABLE: return CreateTableEvent.getEvents(event, catalogName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 6e12c35e2b8..cbd0bfb5fa6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -115,6 +115,9 @@ public class MetastoreEventsProcessor extends MasterDaemon { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); if (catalog instanceof HMSExternalCatalog) { HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog; + if (!hmsExternalCatalog.isEnableHmsEventsIncrementalSync()) { + continue; + } try { List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog); if (!events.isEmpty()) { @@ -125,6 +128,8 @@ public class MetastoreEventsProcessor extends MasterDaemon { } catch (MetastoreNotificationFetchException e) { LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); } catch (Exception ex) { + hmsExternalCatalog.onRefreshCache(true); + updateLastSyncedEventId(hmsExternalCatalog, -1); LOG.warn("Failed to process hive metastore [{}] events .", hmsExternalCatalog.getName(), ex); } @@ -147,7 +152,7 @@ public class MetastoreEventsProcessor extends MasterDaemon { response = getNextEventResponseForSlave(hmsExternalCatalog); } - if (response == null) { + if (response == null || response.getEventsSize() == 0) { return Collections.emptyList(); } return response.getEvents(); @@ -207,9 +212,15 @@ public class MetastoreEventsProcessor extends MasterDaemon { return null; } + int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc(); try { - return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, - Config.hms_events_batch_size_per_rpc, null); + NotificationEventResponse notificationEventResponse = + hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, batchSize, null); + LOG.info("CatalogName = {}, lastSyncedEventId = {}, currentEventId = {}," + + "batchSize = {}, getEventsSize = {}", hmsExternalCatalog.getName(), lastSyncedEventId, + currentEventId, batchSize, notificationEventResponse.getEvents().size()); + + return notificationEventResponse; } catch (MetastoreNotificationFetchException e) { // Need a fallback to handle this because this error state can not be recovered until restarting FE if (StringUtils.isNotEmpty(e.getMessage()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java index e3ad8668fb5..6e4198186e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java @@ -90,7 +90,7 @@ public class MetaCache<T> { return name == null ? Optional.empty() : getMetaObj(name, id); } - public void updateCache(String objName, T obj) { + public void updateCache(String objName, T obj, long id) { metaObjCache.put(objName, Optional.of(obj)); namesCache.asMap().compute("", (k, v) -> { if (v == null) { @@ -100,9 +100,10 @@ public class MetaCache<T> { return v; } }); + idToName.put(id, objName); } - public void invalidate(String objName) { + public void invalidate(String objName, long id) { namesCache.asMap().compute("", (k, v) -> { if (v == null) { return Lists.newArrayList(); @@ -112,11 +113,13 @@ public class MetaCache<T> { } }); metaObjCache.invalidate(objName); + idToName.remove(id); } public void invalidateAll() { namesCache.invalidateAll(); metaObjCache.invalidateAll(); + idToName.clear(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java index 050ed1d5414..81baf042fae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java @@ -28,5 +28,6 @@ public class HMSProperties { // required public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; public static final List<String> REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS); - + public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync"; + public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc"; } diff --git a/regression-test/pipeline/external/conf/fe.conf b/regression-test/pipeline/external/conf/fe.conf index 8eed72816e8..df6fb86535c 100644 --- a/regression-test/pipeline/external/conf/fe.conf +++ b/regression-test/pipeline/external/conf/fe.conf @@ -96,5 +96,7 @@ auth_token = 5ff161c3-2c08-4079-b108-26c8850b6598 infodb_support_ext_catalog=true trino_connector_plugin_dir=/tmp/trino_connector/connectors +hms_events_polling_interval_ms=2000 + KRB5_CONFIG=/keytabs/krb5.conf diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy index deebb781f19..265d200984e 100644 --- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy @@ -22,7 +22,7 @@ suite("test_hive_ctas", "p0,external,hive,external_docker,external_docker_hive") return; } - for (String hivePrefix : ["hive2", "hive3"]) { + for (String hivePrefix : [ "hive3"]) { def file_formats = ["parquet", "orc"] setHivePrefix(hivePrefix) def generateSrcDDLForCTAS = { String file_format, String catalog_name -> diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy new file mode 100644 index 00000000000..52724b807d3 --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy @@ -0,0 +1,392 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hms_event_notification", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + for (String useMetaCache : ["true","false"] ) { + for (String hivePrefix : [ "hive2","hive3"]) { + try { + setHivePrefix(hivePrefix) + hive_docker """ set hive.stats.autogather=false; """ + + + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "test_hms_event_notification_${hivePrefix}" + String catalog_name_2 = "test_hms_event_notification_${hivePrefix}_2" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + int wait_time = 10000; + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "${useMetaCache}", + "hive.enable_hms_events_incremental_sync" ="true", + "hive.hms_events_batch_size_per_rpc" = "1000" + );""" + + sql """create catalog if not exists ${catalog_name_2} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "${useMetaCache}", + "hive.enable_hms_events_incremental_sync" ="true" + );""" + + sleep(wait_time); + + sql """ switch ${catalog_name} """ + + String tb1 = """${catalog_name}_tb_1""" + String tb2 = """${catalog_name}_tb_2""" + String db1 = "${catalog_name}_db_1"; + String db2 = "${catalog_name}_db_2"; + String partition_tb = "${catalog_name}_partition_tb"; + + try { + hive_docker """ use ${db1};""" + }catch (Exception e){ + } + + hive_docker """ drop table if exists ${tb1};""" + hive_docker """ drop table if exists ${tb2};""" + hive_docker """ drop table if exists ${partition_tb} """ + hive_docker """ drop database if exists ${db1};""" + hive_docker """ drop database if exists ${db2};""" + +//CREATE DATABASE + hive_docker """ create database ${db1};""" + hive_docker """ create database ${db2};""" + sleep(wait_time); + + List<List<String>> dbs = sql """ show databases """ + logger.info("result = " + dbs); + + int flag_db_count = 0 ; + dbs.forEach { + if (it[0] == db1) { + flag_db_count ++; + }else if (it[0] == db2) { + flag_db_count ++; + } + } + assertTrue(flag_db_count == 2); + + + + +//ALTER DATABASE + if (hivePrefix == "hive3") { + String db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("db2 location = " + db2_location ) + + def loc_start = db2_location.indexOf("hdfs://") + def loc_end = db2_location.indexOf(".db") + 3 + db2_location = db2_location.substring(loc_start, loc_end) + logger.info("db2 location = " + db2_location ) + + String new_db2_location = db2_location.replace("warehouse", "new_warehouse_xxx") + logger.info("change db2 location to ${new_db2_location} ") + + logger.info(" alter database begin") + hive_docker """ ALTER DATABASE ${db2} SET LOCATION '${new_db2_location}'; """ + logger.info(" alter database end") + sleep(wait_time); + + String query_db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("query_db2_location = ${query_db2_location} ") + + loc_start = query_db2_location.indexOf("hdfs://") + loc_end = query_db2_location.indexOf(".db") + 3 + query_db2_location = query_db2_location.substring(loc_start, loc_end) + + assertTrue(query_db2_location == new_db2_location); + } + + +//DROP DATABASE + hive_docker """drop database ${db2}; """; + sleep(wait_time); + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0].toString() == db1) { + flag_db_count ++; + } else if (it[0].toString() == db2) { + logger.info(" exists ${db2}") + assertTrue(false); + } + } + assertTrue(flag_db_count == 1); + + +//CREATE TABLE + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + List<List<String>> tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + assertTrue(tbs.isEmpty()) + + + hive_docker """ create table ${tb1} (id int,name string) ;""" + hive_docker """ create table ${tb2} (id int,name string) ;""" + sleep(wait_time); + tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + int flag_tb_count = 0 ; + tbs.forEach { + logger.info("it[0] = " + it[0]) + if (it[0].toString() == "${tb1}") { + flag_tb_count ++; + logger.info(" ${tb1} exists ") + }else if (it[0].toString() == tb2) { + flag_tb_count ++; + logger.info(" ${tb2} exists ") + } + } + assertTrue(flag_tb_count == 2); + + +//ALTER TABLE + List<List<String>> ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.isEmpty()) + + hive_docker """ insert into ${tb1} select 1,"xxx"; """ + sleep(wait_time); + ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 1) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + + + hive_docker """ insert into ${tb1} values( 2,"yyy"); """ + sleep(wait_time); + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "int") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + + hive_docker """ alter table ${tb1} change column id id bigint; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + + hive_docker """ alter table ${tb1} change column name new_name string; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "new_name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + +//DROP TABLE + hive_docker """ drop table ${tb2} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + flag_tb_count = 0 ; + tbs.forEach { + if (it[0] == tb1) { + flag_tb_count ++; + } else if (it[0] == tb2) { + logger.info("exists ${tb1}") + assertTrue(false); + } + } + assertTrue(flag_tb_count == 1); + + + + hive_docker """ drop table ${tb1} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + tbs.forEach { + if (it[0] == tb1) { + logger.info("exists ${tb1}") + assertTrue(false); + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + +//ADD PARTITION + + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + + hive_docker """ CREATE TABLE ${partition_tb} ( + id INT, + name STRING, + age INT + ) + PARTITIONED BY (country STRING); """ + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='USA') + VALUES (1, 'John Doe', 30), + (2, 'Jane Smith', 25);""" + + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='India') + VALUES (3, 'Rahul Kumar', 28), + (4, 'Priya Singh', 24); + """ + sleep(wait_time); + ans = sql """ select * from ${partition_tb} order by id""" + logger.info("ans = ${ans}") + assertTrue(ans.size() == 4) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][3].toString() == "USA") + assertTrue(ans[1][3].toString() == "USA") + assertTrue(ans[3][0].toString() == "4") + assertTrue(ans[2][3].toString() == "India") + assertTrue(ans[3][3].toString() == "India") + + + List<List<String>> pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + int flag_partition_count = 0 ; + pars.forEach { + if (it[0] == "country=India") { + flag_partition_count ++; + } else if (it[0] == "country=USA") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==2) + + + hive_docker """ + ALTER TABLE ${partition_tb} ADD PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=USA") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + +//ALTER PARTITION + hive_docker """ + alter table ${partition_tb} partition(country='USA') rename to partition(country='US') ; + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + +//DROP PARTITION + hive_docker """ + ALTER TABLE ${partition_tb} DROP PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + logger.info("exists partition canada") + assertTrue(false); + } + } + assertTrue(flag_partition_count ==2) + + + sql """drop catalog if exists ${catalog_name}""" + } finally { + } + } + } +} + + + + diff --git a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy new file mode 100644 index 00000000000..24c2ac3b7fb --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy @@ -0,0 +1,676 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + for (String useMetaCache : ["true","false"] ) { + + for (String hivePrefix : [ "hive2","hive3"]) { + try { + setHivePrefix(hivePrefix) + hive_docker """ set hive.stats.autogather=false; """ + + + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "test_hms_event_notification_multi_catalog_${hivePrefix}" + String catalog_name_2 = "test_hms_event_notification_multi_catalog_${hivePrefix}_2" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + int wait_time = 10000; + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "${useMetaCache}", + "hive.enable_hms_events_incremental_sync" ="true", + "hive.hms_events_batch_size_per_rpc" = "10000" + );""" + + sql """drop catalog if exists ${catalog_name_2}""" + sql """create catalog if not exists ${catalog_name_2} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + "use_meta_cache" = "${useMetaCache}", + "hive.enable_hms_events_incremental_sync" ="true", + "hive.hms_events_batch_size_per_rpc" = "100000" + );""" + sleep(wait_time); + + sql """ switch ${catalog_name} """ + + String tb1 = """${catalog_name}_tb_1""" + String tb2 = """${catalog_name}_tb_2""" + String db1 = "${catalog_name}_db_1"; + String db2 = "${catalog_name}_db_2"; + String partition_tb = "${catalog_name}_partition_tb"; + + try { + hive_docker """ use ${db1};""" + }catch (Exception e){ + } + + hive_docker """ drop table if exists ${tb1};""" + hive_docker """ drop table if exists ${tb2};""" + hive_docker """ drop table if exists ${partition_tb} """ + hive_docker """ drop database if exists ${db1};""" + hive_docker """ drop database if exists ${db2};""" + +//CREATE DATABASE + hive_docker """ create database ${db1};""" + hive_docker """ create database ${db2};""" + sleep(wait_time); + + List<List<String>> dbs = sql """ show databases """ + logger.info("result = " + dbs); + + int flag_db_count = 0 ; + dbs.forEach { + if (it[0] == db1) { + flag_db_count ++; + }else if (it[0] == db2) { + flag_db_count ++; + } + } + assertTrue(flag_db_count == 2); + + sql """ switch ${catalog_name_2} """ + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0] == db1) { + flag_db_count ++; + }else if (it[0] == db2) { + flag_db_count ++; + } + } + assertTrue(flag_db_count == 2); + + sql """ switch ${catalog_name} """ + + + +//ALTER DATABASE + if (hivePrefix == "hive3") { + String db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("db2 location = " + db2_location ) + + def loc_start = db2_location.indexOf("hdfs://") + def loc_end = db2_location.indexOf(".db") + 3 + db2_location = db2_location.substring(loc_start, loc_end) + logger.info("db2 location = " + db2_location ) + + String new_db2_location = db2_location.replace("warehouse", "new_warehouse_xxx") + logger.info("change db2 location to ${new_db2_location} ") + + logger.info(" alter database begin") + hive_docker """ ALTER DATABASE ${db2} SET LOCATION '${new_db2_location}'; """ + logger.info(" alter database end") + sleep(wait_time); + + String query_db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("query_db2_location = ${query_db2_location} ") + + loc_start = query_db2_location.indexOf("hdfs://") + loc_end = query_db2_location.indexOf(".db") + 3 + query_db2_location = query_db2_location.substring(loc_start, loc_end) + assertTrue(query_db2_location == new_db2_location); + + + + sql """ switch ${catalog_name_2} """ + query_db2_location = (sql """ SHOW CREATE DATABASE ${db2} """)[0][1] + logger.info("query_db2_location = ${query_db2_location} ") + + loc_start = query_db2_location.indexOf("hdfs://") + loc_end = query_db2_location.indexOf(".db") + 3 + query_db2_location = query_db2_location.substring(loc_start, loc_end) + assertTrue(query_db2_location == new_db2_location); + sql """ switch ${catalog_name} """ + } + + +//DROP DATABASE + hive_docker """drop database ${db2}; """; + sleep(wait_time); + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0].toString() == db1) { + flag_db_count ++; + } else if (it[0].toString() == db2) { + logger.info(" exists ${db2}") + assertTrue(false); + } + } + assertTrue(flag_db_count == 1); + + sql """ switch ${catalog_name_2} """ + dbs = sql """ show databases """ + logger.info("result = " + dbs); + flag_db_count = 0 ; + dbs.forEach { + if (it[0].toString() == db1) { + flag_db_count ++; + } else if (it[0].toString() == db2) { + logger.info(" exists ${db2}") + assertTrue(false); + } + } + assertTrue(flag_db_count == 1); + sql """ switch ${catalog_name} """ + +//CREATE TABLE + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + List<List<String>> tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + assertTrue(tbs.isEmpty()) + + + hive_docker """ create table ${tb1} (id int,name string) ;""" + hive_docker """ create table ${tb2} (id int,name string) ;""" + sleep(wait_time); + tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + int flag_tb_count = 0 ; + tbs.forEach { + logger.info("it[0] = " + it[0]) + if (it[0].toString() == "${tb1}") { + flag_tb_count ++; + logger.info(" ${tb1} exists ") + }else if (it[0].toString() == tb2) { + flag_tb_count ++; + logger.info(" ${tb2} exists ") + } + } + assertTrue(flag_tb_count == 2); + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + tbs = sql """ show tables; """ + logger.info(" tbs = ${tbs}") + flag_tb_count = 0 ; + tbs.forEach { + logger.info("it[0] = " + it[0]) + if (it[0].toString() == "${tb1}") { + flag_tb_count ++; + logger.info(" ${tb1} exists ") + }else if (it[0].toString() == tb2) { + flag_tb_count ++; + logger.info(" ${tb2} exists ") + } + } + assertTrue(flag_tb_count == 2); + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + +//ALTER TABLE + List<List<String>> ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.isEmpty()) + + hive_docker """ insert into ${tb1} select 1,"xxx"; """ + sleep(wait_time); + ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 1) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ select * from ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 1) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + hive_docker """ insert into ${tb1} values( 2,"yyy"); """ + sleep(wait_time); + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "int") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "int") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + hive_docker """ alter table ${tb1} change column id id bigint; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + hive_docker """ alter table ${tb1} change column name new_name string; """ + sleep(wait_time); + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "new_name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ desc ${tb1} """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "id") + assertTrue(ans[0][1].toString() == "bigint") + assertTrue(ans[1][0].toString() == "new_name") + assertTrue(ans[1][1].toString() == "text") + ans = sql """ select * from ${tb1} order by id """ + logger.info("ans = ${ans}") + assertTrue(ans.size() == 2) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][1].toString() == "xxx") + assertTrue(ans[1][0].toString() == "2") + assertTrue(ans[1][1].toString() == "yyy") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + +//DROP TABLE + hive_docker """ drop table ${tb2} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + flag_tb_count = 0 ; + tbs.forEach { + if (it[0] == tb1) { + flag_tb_count ++; + } else if (it[0] == tb2) { + logger.info("exists ${tb1}") + assertTrue(false); + } + } + assertTrue(flag_tb_count == 1); + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + tbs = sql """ show tables; """ + logger.info(""" tbs = ${tbs}""") + flag_tb_count = 0 ; + tbs.forEach { + if (it[0] == tb1) { + flag_tb_count ++; + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + assertTrue(flag_tb_count == 1); + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + + hive_docker """ drop table ${tb1} """ + sleep(wait_time); + tbs = sql """ show tables; """ + + logger.info(""" tbs = ${tbs}""") + + tbs.forEach { + if (it[0] == tb1) { + logger.info("exists ${tb1}") + assertTrue(false); + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + tbs = sql """ show tables; """ + logger.info(""" tbs = ${tbs}""") + tbs.forEach { + if (it[0] == tb1) { + logger.info("exists ${tb1}") + assertTrue(false); + } else if (it[0] == tb2) { + logger.info("exists ${tb2}") + assertTrue(false); + } + } + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + +//ADD PARTITION + + hive_docker """ use ${db1} """ + sql """ use ${db1} """ + + hive_docker """ CREATE TABLE ${partition_tb} ( + id INT, + name STRING, + age INT + ) + PARTITIONED BY (country STRING); """ + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='USA') + VALUES (1, 'John Doe', 30), + (2, 'Jane Smith', 25);""" + + hive_docker """ + INSERT INTO TABLE ${partition_tb} PARTITION (country='India') + VALUES (3, 'Rahul Kumar', 28), + (4, 'Priya Singh', 24); + """ + sleep(wait_time); + ans = sql """ select * from ${partition_tb} order by id""" + logger.info("ans = ${ans}") + assertTrue(ans.size() == 4) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][3].toString() == "USA") + assertTrue(ans[1][3].toString() == "USA") + assertTrue(ans[3][0].toString() == "4") + assertTrue(ans[2][3].toString() == "India") + assertTrue(ans[3][3].toString() == "India") + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + ans = sql """ select * from ${partition_tb} order by id""" + logger.info("ans = ${ans}") + assertTrue(ans.size() == 4) + assertTrue(ans[0][0].toString() == "1") + assertTrue(ans[0][3].toString() == "USA") + assertTrue(ans[1][3].toString() == "USA") + assertTrue(ans[3][0].toString() == "4") + assertTrue(ans[2][3].toString() == "India") + assertTrue(ans[3][3].toString() == "India") + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + List<List<String>> pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + int flag_partition_count = 0 ; + pars.forEach { + if (it[0] == "country=India") { + flag_partition_count ++; + } else if (it[0] == "country=USA") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==2) + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 ; + pars.forEach { + if (it[0] == "country=India") { + flag_partition_count ++; + } else if (it[0] == "country=USA") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==2) + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + + hive_docker """ + ALTER TABLE ${partition_tb} ADD PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=USA") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=USA") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + +//ALTER PARTITION + hive_docker """ + alter table ${partition_tb} partition(country='USA') rename to partition(country='US') ; + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 3) + flag_partition_count = 0 ; + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + flag_partition_count ++; + } + } + assertTrue(flag_partition_count ==3) + + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + +//DROP PARTITION + hive_docker """ + ALTER TABLE ${partition_tb} DROP PARTITION (country='Canada'); + """ + sleep(wait_time); + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + logger.info("exists partition canada") + assertTrue(false); + } + } + assertTrue(flag_partition_count ==2) + + + sql """ switch ${catalog_name_2} """ + sql """ use ${db1} """ + pars = sql """ SHOW PARTITIONS from ${partition_tb}; """ + logger.info("pars = ${pars}") + assertTrue(pars.size() == 2) + flag_partition_count = 0 + pars.forEach { + if (it[0].toString() == "country=India") { + flag_partition_count ++; + } else if (it[0].toString() == "country=US") { + flag_partition_count ++; + } else if (it[0].toString() == "country=Canada") { + logger.info("exists partition canada") + assertTrue(false); + } + } + assertTrue(flag_partition_count ==2) + sql """ switch ${catalog_name} """ + sql """ use ${db1} """ + + + + sql """drop catalog if exists ${catalog_name}""" + sql """drop catalog if exists ${catalog_name_2}""" + } finally { + } + } + } +} + + + + diff --git a/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy index 0b6fab86b2b..087b797faaf 100644 --- a/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy +++ b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy @@ -880,7 +880,7 @@ INSERT INTO all_types_par_${format_compression}_${catalog_name}_q03 return; } - for (String hivePrefix : ["hive2", "hive3"]) { + for (String hivePrefix : ["hive3"]) { setHivePrefix(hivePrefix) try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") diff --git a/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy b/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy index cd0533d00d9..7e3f070636e 100644 --- a/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy +++ b/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy @@ -195,7 +195,7 @@ suite("test_hive_write_partitions", "p0,external,hive,external_docker,external_d return; } - for (String hivePrefix : ["hive2", "hive3"]) { + for (String hivePrefix : ["hive3"]) { setHivePrefix(hivePrefix) try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org