This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a3e16d3db09 branch-2.1: [fix](olap) clear storage_policy property when 
is_being_synced = true #48229 (#48304)
a3e16d3db09 is described below

commit a3e16d3db094a35cdc0e75c03cdb9f4934574141
Author: walter <maoch...@selectdb.com>
AuthorDate: Tue Feb 25 20:33:05 2025 +0800

    branch-2.1: [fix](olap) clear storage_policy property when is_being_synced 
= true #48229 (#48304)
    
    cherry pick from #48229
---
 .../java/org/apache/doris/catalog/OlapTable.java   |   2 +
 .../apache/doris/common/util/PropertyAnalyzer.java |   9 +
 .../apache/doris/datasource/InternalCatalog.java   |   3 +
 .../ccr_syncer_p0/test_is_being_synced.groovy      | 204 +++++++++++++++++++++
 4 files changed, 218 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 694a0cecbc9..f34c9192e04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2225,6 +2225,8 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         if (properties.containsKey(DynamicPartitionProperty.STORAGE_POLICY)) {
             properties.remove(DynamicPartitionProperty.STORAGE_POLICY);
         }
+        // storage policy is invalid for table/partition when table is being 
synced
+        partitionInfo.refreshTableStoragePolicy("");
     }
 
     public void checkChangeReplicaAllocation() throws DdlException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index af09aab8784..251b1a6a383 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -243,6 +243,7 @@ public class PropertyAnalyzer {
         String newStoragePolicy = oldStoragePolicy;
         boolean hasStoragePolicy = false;
         boolean storageMediumSpecified = false;
+        boolean isBeingSynced = false;
 
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             String key = entry.getKey();
@@ -268,6 +269,8 @@ public class PropertyAnalyzer {
             } else if (key.equalsIgnoreCase(PROPERTIES_STORAGE_POLICY)) {
                 hasStoragePolicy = true;
                 newStoragePolicy = value;
+            } else if (key.equalsIgnoreCase(PROPERTIES_IS_BEING_SYNCED)) {
+                isBeingSynced = Boolean.parseBoolean(value);
             }
         } // end for properties
 
@@ -296,6 +299,12 @@ public class PropertyAnalyzer {
             cooldownTimestamp = DataProperty.MAX_COOLDOWN_TIME_MS;
         }
 
+        // when isBeingSynced property is set to true, the storage policy will 
be ignored
+        if (isBeingSynced) {
+            hasStoragePolicy = false;
+            newStoragePolicy = "";
+        }
+
         if (hasStoragePolicy && !"".equals(newStoragePolicy)) {
             // check remote storage policy
             StoragePolicy checkedPolicy = 
StoragePolicy.ofCheck(newStoragePolicy);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index a58c0e4a149..e5a6c1199b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -2685,6 +2685,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             partitionInfo.setIsInMemory(partitionId, isInMemory);
             partitionInfo.setTabletType(partitionId, tabletType);
             partitionInfo.setIsMutable(partitionId, isMutable);
+            if (isBeingSynced) {
+                partitionInfo.refreshTableStoragePolicy("");
+            }
         }
         // check colocation properties
         try {
diff --git a/regression-test/suites/ccr_syncer_p0/test_is_being_synced.groovy 
b/regression-test/suites/ccr_syncer_p0/test_is_being_synced.groovy
new file mode 100644
index 00000000000..5f40d92b7c9
--- /dev/null
+++ b/regression-test/suites/ccr_syncer_p0/test_is_being_synced.groovy
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_is_being_synced") {
+    def syncer = getSyncer()
+    if (!syncer.checkEnableFeatureBinlog()) {
+        logger.info("fe enable_feature_binlog is false, skip case 
test_is_being_synced")
+        return
+    }
+
+    String suiteName = "test_is_being_synced"
+    String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+    String dbName = "${suiteName}_db"
+    String tableName = "${suiteName}_table"
+    String resourceName = "${suiteName}_resource"
+    String policyName = "${suiteName}_policy"
+
+    sql """
+        CREATE RESOURCE IF NOT EXISTS "${resourceName}"
+        PROPERTIES(
+            "type"="s3",
+            "AWS_ENDPOINT" = "${getS3Endpoint()}",
+            "AWS_REGION" = "${getS3Region()}",
+            "AWS_ROOT_PATH" = "regression/cooldown",
+            "AWS_ACCESS_KEY" = "${getS3AK()}",
+            "AWS_SECRET_KEY" = "${getS3SK()}",
+            "AWS_MAX_CONNECTIONS" = "50",
+            "AWS_REQUEST_TIMEOUT_MS" = "3000",
+            "AWS_CONNECTION_TIMEOUT_MS" = "1000",
+            "AWS_BUCKET" = "${getS3BucketName()}",
+            "s3_validity_check" = "true"
+        );
+    """
+
+    sql """
+        CREATE STORAGE POLICY IF NOT EXISTS ${policyName}
+        PROPERTIES(
+            "storage_resource" = "${resourceName}",
+            "cooldown_ttl" = "300"
+        )
+    """
+
+    sql "DROP DATABASE IF EXISTS ${dbName} FORCE"
+    sql "CREATE DATABASE ${dbName}"
+
+    // 1. Create a table with is_being_synced property
+    sql """
+        CREATE TABLE ${dbName}.${tableName}
+        (
+            k1 INT,
+            v1 INT
+        )
+        ENGINE=OLAP
+        DUPLICATE KEY(k1)
+        PARTITION BY RANGE(k1)
+        (
+            PARTITION p1 VALUES LESS THAN (100),
+            PARTITION p2 VALUES LESS THAN (200),
+            PARTITION p3 VALUES LESS THAN (300) ("storage_policy" = 
"${policyName}")
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 3
+        PROPERTIES (
+            "replication_num" = "1",
+            "is_being_synced" = "true",
+            "storage_policy" = "${policyName}"
+        )
+    """
+
+    def show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+    def create_table_sql = show_result[0][1]
+    logger.info("${create_table_sql}")
+
+    assertFalse(create_table_sql.containsIgnoreCase("${policyName}"))
+    assertTrue(create_table_sql.containsIgnoreCase('"is_being_synced" = 
"true"'))
+
+    // 2. Create table with is_being_synced property, storage policy is not 
exists
+    sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+    sql """
+        CREATE TABLE ${dbName}.${tableName}
+        (
+            k1 INT,
+            v1 INT
+        )
+        ENGINE=OLAP
+        DUPLICATE KEY(k1)
+        PARTITION BY RANGE(k1)
+        (
+            PARTITION p1 VALUES LESS THAN (100),
+            PARTITION p2 VALUES LESS THAN (200),
+            PARTITION p3 VALUES LESS THAN (300) ("storage_policy" = 
"unknown_partition_storage_policy")
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 3
+        PROPERTIES (
+            "replication_num" = "1",
+            "is_being_synced" = "true",
+            "storage_policy" = "unknown_table_storage_policy"
+        )
+    """
+
+    show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+    create_table_sql = show_result[0][1]
+    logger.info("${create_table_sql}")
+
+    
assertFalse(create_table_sql.containsIgnoreCase('unknown_partition_storage_policy'))
+    
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+
+    // 3. For list partition
+    sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+    sql """
+        CREATE TABLE ${dbName}.${tableName}
+        (
+            k1 INT,
+            v1 INT
+        )
+        ENGINE=OLAP
+        DUPLICATE KEY(k1)
+        PARTITION BY LIST(k1)
+        (
+            PARTITION p1 VALUES IN ((100), (200), (300)),
+            PARTITION p2 VALUES IN ((400), (500), (600)),
+            PARTITION p3 VALUES IN ((700), (800)) ("storage_policy" = 
"unknown_partition_storage_policy")
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 3
+        PROPERTIES (
+            "replication_num" = "1",
+            "is_being_synced" = "true",
+            "storage_policy" = "unknown_table_storage_policy"
+        )
+    """
+
+    show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+    create_table_sql = show_result[0][1]
+    logger.info("${create_table_sql}")
+
+    
assertFalse(create_table_sql.containsIgnoreCase('unknown_partition_storage_policy'))
+    
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+
+    // 4. For single partition
+    sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+    sql """
+        CREATE TABLE ${dbName}.${tableName}
+        (
+            k1 INT,
+            v1 INT
+        )
+        ENGINE=OLAP
+        DUPLICATE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 3
+        PROPERTIES (
+            "replication_num" = "1",
+            "is_being_synced" = "true",
+            "storage_policy" = "unknown_table_storage_policy"
+        )
+    """
+
+    show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+    create_table_sql = show_result[0][1]
+    logger.info("${create_table_sql}")
+
+    
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+
+    // 5. For auto partition
+    sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+    sql """
+        CREATE TABLE ${dbName}.${tableName}
+        (
+            k1 INT,
+            v1 INT,
+            `TIME_STAMP` datev2 NOT NULL COMMENT '采集日期'
+        )
+        ENGINE=OLAP
+        DUPLICATE KEY(k1)
+        AUTO PARTITION BY RANGE (date_trunc(`TIME_STAMP`, 'month'))
+        (
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 3
+        PROPERTIES (
+            "replication_num" = "1",
+            "is_being_synced" = "true",
+            "storage_policy" = "unknown_table_storage_policy"
+        )
+    """
+
+    show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+    create_table_sql = show_result[0][1]
+    logger.info("${create_table_sql}")
+
+    
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to