This is an automated email from the ASF dual-hosted git repository.

yangbowen pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 70e822461f2 [branch-2.0](schema change) opt cooldown data schema 
change (#40963)
70e822461f2 is described below

commit 70e822461f25cf63a67b82813abb1cf2d2523771
Author: Xujian Duan <50550370+darvend...@users.noreply.github.com>
AuthorDate: Fri Sep 20 11:01:44 2024 +0800

    [branch-2.0](schema change) opt cooldown data schema change (#40963)
    
    ## Proposed changes
    
    This PR solves 3 problems of schema change for cooldown data:
    
    1. Schema change will build a new tablet for base tablet, and all
    replicas do this separately, so the cooldown data SC writes all replicas
    to the remote storage, But the cooldown rowsets just need one master
    replica to cooldown, the other replicas follow the master. To optimize
    this issue, Doris can just write the new rowsets to the local storage,
    and the tablet will cooldown this rowsets automatically for just one
    mater replica.
    
    3. In schema change job and rollup job, FE generates createReplicaTask,
    which specifies the storage policy of the tablets to create, table's
    storage policy may be empty if we just set the partitions' storage
    policy, so we use partition's storage policy instead of table.
    
    5. checks the target tablets in the loop of data conversion, and
    terminates the data conversion thread if the tablet is been dropped.
---
 be/src/olap/schema_change.cpp                            | 16 +++++++++++++++-
 .../main/java/org/apache/doris/alter/RollupJobV2.java    |  3 ++-
 .../java/org/apache/doris/alter/SchemaChangeJobV2.java   |  3 ++-
 3 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 7d867d89460..7e6b9a3ff35 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -498,6 +498,13 @@ Status 
VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader
                                              TabletSchemaSPtr 
base_tablet_schema) {
     bool eof = false;
     do {
+        // tablet may be dropped due to user cancel, schema change thread 
should fast fail
+        // and release tablet lock.
+        if (new_tablet->tablet_state() == TABLET_SHUTDOWN) {
+            return Status::Error<TABLE_ALREADY_DELETED_ERROR>(
+                    "fail to process tablet because it is to be deleted. 
tablet_id={}",
+                    new_tablet->tablet_id());
+        }
         auto new_block =
                 
vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block());
         auto ref_block = 
vectorized::Block::create_unique(base_tablet_schema->create_block());
@@ -580,6 +587,13 @@ Status 
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
 
     bool eof = false;
     do {
+        // tablet may be dropped due to user cancel, schema change thread 
should fast fail
+        // and release tablet lock.
+        if (new_tablet->tablet_state() == TABLET_SHUTDOWN) {
+            return Status::Error<TABLE_ALREADY_DELETED_ERROR>(
+                    "fail to process tablet because it is to be deleted. 
tablet_id={}",
+                    new_tablet->tablet_id());
+        }
         auto ref_block = 
vectorized::Block::create_unique(base_tablet_schema->create_block());
         auto st = rowset_reader->next_block(ref_block.get());
         if (!st) {
@@ -1103,7 +1117,7 @@ Status 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
         context.segments_overlap = 
rs_reader->rowset()->rowset_meta()->segments_overlap();
         context.tablet_schema = new_tablet->tablet_schema();
         context.newest_write_timestamp = rs_reader->newest_write_timestamp();
-        context.fs = rs_reader->rowset()->rowset_meta()->fs();
+        context.fs = io::global_local_filesystem();
         context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
         Status status = new_tablet->create_rowset_writer(context, 
&rowset_writer);
         if (!status.ok()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 19d8dc13d75..7bb59269a33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -282,7 +282,8 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 tabletType,
                                 null,
                                 tbl.getCompressionType(),
-                                tbl.getEnableUniqueKeyMergeOnWrite(), 
tbl.getStoragePolicy(),
+                                tbl.getEnableUniqueKeyMergeOnWrite(),
+                                
tbl.getPartitionInfo().getDataProperty(partitionId).getStoragePolicy(),
                                 tbl.disableAutoCompaction(),
                                 tbl.enableSingleReplicaCompaction(),
                                 tbl.skipWriteIndexOnLoad(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 9b7c127676e..2591d3106e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -277,7 +277,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     
tbl.getPartitionInfo().getTabletType(partitionId),
                                     null,
                                     tbl.getCompressionType(),
-                                    tbl.getEnableUniqueKeyMergeOnWrite(), 
tbl.getStoragePolicy(),
+                                    tbl.getEnableUniqueKeyMergeOnWrite(),
+                                    
tbl.getPartitionInfo().getDataProperty(partitionId).getStoragePolicy(),
                                     tbl.disableAutoCompaction(),
                                     tbl.enableSingleReplicaCompaction(),
                                     tbl.skipWriteIndexOnLoad(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to