This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new b2c2cdb122 [feature] Support compression prop (#8923) b2c2cdb122 is described below commit b2c2cdb122b608c6391932ce5ef7b233ea9dc6bb Author: Lightman <31928846+lchangli...@users.noreply.github.com> AuthorDate: Fri May 27 21:52:05 2022 +0800 [feature] Support compression prop (#8923) --- be/src/olap/rowset/segment_v2/column_writer.cpp | 4 +-- be/src/olap/rowset/segment_v2/segment_writer.cpp | 10 ++++--- be/src/olap/rowset/segment_v2/segment_writer.h | 2 +- be/src/olap/tablet_meta.cpp | 31 ++++++++++++++++++++-- be/src/olap/tablet_meta.h | 3 ++- be/src/olap/tablet_schema.cpp | 2 ++ be/src/olap/tablet_schema.h | 4 +++ .../java/org/apache/doris/alter/RollupJobV2.java | 3 ++- .../org/apache/doris/alter/SchemaChangeJobV2.java | 3 ++- .../java/org/apache/doris/backup/RestoreJob.java | 3 ++- .../java/org/apache/doris/catalog/Catalog.java | 28 ++++++++++++++++--- .../java/org/apache/doris/catalog/OlapTable.java | 16 +++++++++++ .../org/apache/doris/catalog/TableProperty.java | 16 ++++++++++- .../apache/doris/common/util/PropertyAnalyzer.java | 31 ++++++++++++++++++++++ .../org/apache/doris/master/ReportHandler.java | 3 ++- .../org/apache/doris/task/CreateReplicaTask.java | 10 +++++-- .../org/apache/doris/catalog/CreateTableTest.java | 10 +++++++ .../java/org/apache/doris/task/AgentTaskTest.java | 3 ++- gensrc/proto/olap_file.proto | 2 ++ gensrc/thrift/AgentService.thrift | 13 +++++++++ 20 files changed, 176 insertions(+), 21 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 9a54b210c8..6c76ddff62 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -121,7 +121,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* length_options.meta->set_length( get_scalar_type_info<OLAP_FIELD_TYPE_UNSIGNED_INT>()->size()); length_options.meta->set_encoding(DEFAULT_ENCODING); - length_options.meta->set_compression(LZ4F); + length_options.meta->set_compression(opts.meta->compression()); length_options.need_zone_map = false; length_options.need_bloom_filter = false; @@ -149,7 +149,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* null_options.meta->set_length( get_scalar_type_info<OLAP_FIELD_TYPE_TINYINT>()->size()); null_options.meta->set_encoding(DEFAULT_ENCODING); - null_options.meta->set_compression(LZ4F); + null_options.meta->set_compression(opts.meta->compression()); null_options.need_zone_map = false; null_options.need_bloom_filter = false; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 4db2549683..f95b62c9c2 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -63,17 +63,19 @@ SegmentWriter::~SegmentWriter() { }; void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, - const TabletColumn& column) { + const TabletColumn& column, + const TabletSchema* tablet_schema) { // TODO(zc): Do we need this column_id?? meta->set_column_id((*column_id)++); meta->set_unique_id(column.unique_id()); meta->set_type(column.type()); meta->set_length(column.length()); meta->set_encoding(DEFAULT_ENCODING); - meta->set_compression(LZ4F); + meta->set_compression(tablet_schema->compression_type()); meta->set_is_nullable(column.is_nullable()); for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { - init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); + init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i), + tablet_schema); } } @@ -84,7 +86,7 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused)) ColumnWriterOptions opts; opts.meta = _footer.add_columns(); - init_column_meta(opts.meta, &column_id, column); + init_column_meta(opts.meta, &column_id, column, _tablet_schema); // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS // and not support zone map for array type. diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index ab928b51e1..67fb9e02a2 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -80,7 +80,7 @@ public: Status finalize(uint64_t* segment_file_size, uint64_t* index_size); static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, - const TabletColumn& column); + const TabletColumn& column, const TabletSchema* tablet_schema); private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 0cd9202a72..e299d1a048 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -43,7 +43,7 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id, col_ordinal_to_unique_id, tablet_uid, request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK, - request.storage_medium, request.storage_param.storage_name)); + request.storage_medium, request.storage_param.storage_name, request.compression_type)); return Status::OK(); } @@ -54,7 +54,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id uint32_t next_unique_id, const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, TabletUid tablet_uid, TTabletType::type tabletType, - TStorageMedium::type t_storage_medium, const std::string& storage_name) + TStorageMedium::type t_storage_medium, const std::string& storage_name, + TCompressionType::type compression_type) : _tablet_uid(0, 0), _schema(new TabletSchema) { TabletMetaPB tablet_meta_pb; tablet_meta_pb.set_table_id(table_id); @@ -90,8 +91,34 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id LOG(WARNING) << "unknown tablet keys type"; break; } + // compress_kind used to compress segment files schema->set_compress_kind(COMPRESS_LZ4); + // compression_type used to compress segment page + switch (compression_type) { + case TCompressionType::NO_COMPRESSION: + schema->set_compression_type(NO_COMPRESSION); + break; + case TCompressionType::SNAPPY: + schema->set_compression_type(SNAPPY); + break; + case TCompressionType::LZ4: + schema->set_compression_type(LZ4); + break; + case TCompressionType::LZ4F: + schema->set_compression_type(LZ4F); + break; + case TCompressionType::ZLIB: + schema->set_compression_type(ZLIB); + break; + case TCompressionType::ZSTD: + schema->set_compression_type(ZSTD); + break; + default: + schema->set_compression_type(LZ4F); + break; + } + switch (tablet_schema.sort_type) { case TSortType::type::ZORDER: schema->set_sort_type(SortType::ZORDER); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index c0b165a7bb..c8709941e5 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -83,7 +83,8 @@ public: uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id, const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, TabletUid tablet_uid, TTabletType::type tabletType, - TStorageMedium::type t_storage_medium, const std::string& remote_storage_name); + TStorageMedium::type t_storage_medium, const std::string& remote_storage_name, + TCompressionType::type compression_type); // If need add a filed in TableMeta, filed init copy in copy construct function TabletMeta(const TabletMeta& tablet_meta); TabletMeta(TabletMeta&& tablet_meta) = delete; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 702fef36a5..6e9703f569 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -441,6 +441,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _sequence_col_idx = schema.sequence_col_idx(); _sort_type = schema.sort_type(); _sort_col_num = schema.sort_col_num(); + _compression_type = schema.compression_type(); } void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) { @@ -461,6 +462,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) { tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx); tablet_meta_pb->set_sort_type(_sort_type); tablet_meta_pb->set_sort_col_num(_sort_col_num); + tablet_meta_pb->set_compression_type(_compression_type); } uint32_t TabletSchema::mem_size() const { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 8554bebb3b..7c3209ee82 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -20,6 +20,7 @@ #include <vector> #include "gen_cpp/olap_file.pb.h" +#include "gen_cpp/segment_v2.pb.h" #include "olap/olap_define.h" #include "olap/types.h" @@ -145,6 +146,8 @@ public: void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; } bool has_sequence_col() const { return _sequence_col_idx != -1; } int32_t sequence_col_idx() const { return _sequence_col_idx; } + segment_v2::CompressionTypePB compression_type() const { return _compression_type; } + vectorized::Block create_block( const std::vector<uint32_t>& return_columns, const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const; @@ -168,6 +171,7 @@ private: size_t _num_short_key_columns = 0; size_t _num_rows_per_row_block = 0; CompressKind _compress_kind = COMPRESS_NONE; + segment_v2::CompressionTypePB _compression_type = segment_v2::CompressionTypePB::LZ4F; size_t _next_column_unique_id = 0; bool _has_bf_fpp = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 53027c2357..7f3a115ed4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -231,7 +231,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch, tbl.getCopiedIndexes(), tbl.isInMemory(), - tabletType); + tabletType, + tbl.getCompressionType()); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 2d55523d75..a7faa58743 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -253,7 +253,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { originKeysType, TStorageType.COLUMN, storageMedium, shadowSchema, bfColumns, bfFpp, countDownLatch, indexes, tbl.isInMemory(), - tbl.getPartitionInfo().getTabletType(partitionId)); + tbl.getPartitionInfo().getTabletType(partitionId), + tbl.getCompressionType()); createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId), originSchemaHash); if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 9445f74413..922b314a21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -964,7 +964,8 @@ public class RestoreJob extends AbstractJob { indexMeta.getSchema(), bfColumns, bfFpp, null, localTbl.getCopiedIndexes(), localTbl.isInMemory(), - localTbl.getPartitionInfo().getTabletType(restorePart.getId())); + localTbl.getPartitionInfo().getTabletType(restorePart.getId()), + localTbl.getCompressionType()); task.setInRestoreMode(true); batchTask.addTask(task); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 50c3c40e78..45de5422c7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -245,6 +245,7 @@ import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; @@ -3026,6 +3027,7 @@ public class Catalog { * 6.2. replicationNum * 6.3. inMemory * 6.4. storageFormat + * 6.5. compressionType * 7. set index meta * 8. check colocation properties * 9. create tablet in BE @@ -3314,6 +3316,7 @@ public class Catalog { singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(), singlePartitionDesc.getTabletType(), + olapTable.getCompressionType(), olapTable.getDataSortInfo() ); @@ -3545,6 +3548,7 @@ public class Catalog { boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, + TCompressionType compressionType, DataSortInfo dataSortInfo) throws DdlException { // create base index first. Preconditions.checkArgument(baseIndexId != -1); @@ -3612,7 +3616,8 @@ public class Catalog { indexes, isInMemory, tabletType, - dataSortInfo); + dataSortInfo, + compressionType); task.setStorageFormat(storageFormat); batchTask.addTask(task); // add to AgentTaskQueue for handling finish report. @@ -3730,6 +3735,15 @@ public class Catalog { } olapTable.setStorageFormat(storageFormat); + // get compression type + TCompressionType compressionType = TCompressionType.LZ4; + try { + compressionType = PropertyAnalyzer.analyzeCompressionType(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setCompressionType(compressionType); + // check data sort properties DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType, keysDesc.keysColumnSize(), storageFormat); @@ -3778,6 +3792,7 @@ public class Catalog { throw new DdlException(e.getMessage()); } + if (partitionInfo.getType() == PartitionType.UNPARTITIONED) { // if this is an unpartitioned table, we should analyze data property and replication num here. // if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase. @@ -3914,7 +3929,7 @@ public class Catalog { partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), - isInMemory, storageFormat, tabletType, olapTable.getDataSortInfo()); + isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo()); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { try { @@ -3965,7 +3980,8 @@ public class Catalog { versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, - partitionInfo.getTabletType(entry.getValue()), olapTable.getDataSortInfo()); + partitionInfo.getTabletType(entry.getValue()), + compressionType, olapTable.getDataSortInfo()); olapTable.addPartition(partition); } } else { @@ -4339,6 +4355,11 @@ public class Catalog { sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE).append("\" = \""); sb.append(remoteStorageResource).append("\""); } + // compression type + if (olapTable.getCompressionType() != TCompressionType.LZ4F) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPRESSION).append("\" = \""); + sb.append(olapTable.getCompressionType()).append("\""); + } sb.append("\n)"); } else if (table.getType() == TableType.MYSQL) { @@ -6882,6 +6903,7 @@ public class Catalog { copiedTbl.isInMemory(), copiedTbl.getStorageFormat(), copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), + copiedTbl.getCompressionType(), copiedTbl.getDataSortInfo()); newPartitions.add(newPartition); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index c7401e08ae..25eb7f65db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -48,6 +48,7 @@ import org.apache.doris.qe.OriginStatement; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TOlapTable; import org.apache.doris.thrift.TSortType; import org.apache.doris.thrift.TStorageFormat; @@ -1679,6 +1680,14 @@ public class OlapTable extends Table { return !tempPartitions.isEmpty(); } + public void setCompressionType(TCompressionType compressionType) { + if (tableProperty == null) { + tableProperty = new TableProperty(new HashMap<>()); + } + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPRESSION, compressionType.name()); + tableProperty.buildCompressionType(); + } + public void setStorageFormat(TStorageFormat storageFormat) { if (tableProperty == null) { tableProperty = new TableProperty(new HashMap<>()); @@ -1694,6 +1703,13 @@ public class OlapTable extends Table { return tableProperty.getStorageFormat(); } + public TCompressionType getCompressionType() { + if (tableProperty == null) { + return TCompressionType.LZ4F; + } + return tableProperty.getCompressionType(); + } + public DataSortInfo getDataSortInfo() { if (tableProperty == null) { return new DataSortInfo(TSortType.LEXICAL, this.getKeysNum()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index f9d2063284..b9c3835c96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -25,6 +25,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.persist.OperationType; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TStorageFormat; import com.google.common.base.Strings; @@ -67,6 +68,8 @@ public class TableProperty implements Writable { */ private TStorageFormat storageFormat = TStorageFormat.DEFAULT; + private TCompressionType compressionType = TCompressionType.LZ4F; + private DataSortInfo dataSortInfo = new DataSortInfo(); // remote storage resource, for cold data @@ -147,6 +150,12 @@ public class TableProperty implements Writable { return this; } + public TableProperty buildCompressionType() { + compressionType = TCompressionType.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPRESSION, + TCompressionType.LZ4F.name())); + return this; + } + public TableProperty buildStorageFormat() { storageFormat = TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, TStorageFormat.DEFAULT.name())); @@ -227,6 +236,10 @@ public class TableProperty implements Writable { return remoteStorageResource; } + public TCompressionType getCompressionType() { + return compressionType; + } + public void buildReplicaAllocation() { try { // Must copy the properties because "analyzeReplicaAllocation" with remove the property @@ -251,7 +264,8 @@ public class TableProperty implements Writable { .buildInMemory() .buildStorageFormat() .buildDataSortInfo() - .buildRemoteStorageResource(); + .buildRemoteStorageResource() + .buildCompressionType(); if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) { // get replica num from property map and create replica allocation String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 20aac35971..5202122410 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -32,6 +32,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.resource.Tag; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TSortType; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; @@ -75,6 +76,7 @@ public class PropertyAnalyzer { public static final String PROPERTIES_COLOCATE_WITH = "colocate_with"; public static final String PROPERTIES_TIMEOUT = "timeout"; + public static final String PROPERTIES_COMPRESSION = "compression"; public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type"; public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = "send_clear_alter_tasks"; @@ -433,6 +435,35 @@ public class PropertyAnalyzer { return timeout; } + // analyzeCompressionType will parse the compression type from properties + public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException { + String compressionType = ""; + if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) { + compressionType = properties.get(PROPERTIES_COMPRESSION); + properties.remove(PROPERTIES_COMPRESSION); + } else { + return TCompressionType.LZ4F; + } + + if (compressionType.equalsIgnoreCase("no_compression")) { + return TCompressionType.NO_COMPRESSION; + } else if (compressionType.equalsIgnoreCase("lz4")) { + return TCompressionType.LZ4; + } else if (compressionType.equalsIgnoreCase("lz4f")) { + return TCompressionType.LZ4F; + } else if (compressionType.equalsIgnoreCase("zlib")) { + return TCompressionType.ZLIB; + } else if (compressionType.equalsIgnoreCase("zstd")) { + return TCompressionType.ZSTD; + } else if (compressionType.equalsIgnoreCase("snappy")) { + return TCompressionType.SNAPPY; + } else if (compressionType.equalsIgnoreCase("default_compression")) { + return TCompressionType.LZ4F; + } else { + throw new AnalysisException("unknown compression type: " + compressionType); + } + } + // analyzeStorageFormat will parse the storage format from properties // sql: alter table tablet_name set ("storage_format" = "v2") // Use this sql to convert all tablets(base and rollup index) to a new format segment diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index d130fbaf0a..cdca5251fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -601,7 +601,8 @@ public class ReportHandler extends Daemon { TStorageMedium.HDD, indexMeta.getSchema(), bfColumns, bfFpp, null, olapTable.getCopiedIndexes(), olapTable.isInMemory(), - olapTable.getPartitionInfo().getTabletType(partitionId)); + olapTable.getPartitionInfo().getTabletType(partitionId), + olapTable.getCompressionType()); createReplicaTask.setIsRecoverTask(true); createReplicaBatchTask.addTask(createReplicaTask); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 29068f7542..8718dd7663 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Status; import org.apache.doris.thrift.TColumn; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TCreateTabletReq; import org.apache.doris.thrift.TOlapTableIndex; import org.apache.doris.thrift.TStatusCode; @@ -54,6 +55,7 @@ public class CreateReplicaTask extends AgentTask { private KeysType keysType; private TStorageType storageType; private TStorageMedium storageMedium; + private TCompressionType compressionType; private List<Column> columns; @@ -93,7 +95,7 @@ public class CreateReplicaTask extends AgentTask { Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch, List<Index> indexes, boolean isInMemory, - TTabletType tabletType) { + TTabletType tabletType, TCompressionType compressionType) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); this.shortKeyColumnCount = shortKeyColumnCount; @@ -104,6 +106,7 @@ public class CreateReplicaTask extends AgentTask { this.keysType = keysType; this.storageType = storageType; this.storageMedium = storageMedium; + this.compressionType = compressionType; this.columns = columns; @@ -125,7 +128,8 @@ public class CreateReplicaTask extends AgentTask { List<Index> indexes, boolean isInMemory, TTabletType tabletType, - DataSortInfo dataSortInfo) { + DataSortInfo dataSortInfo, + TCompressionType compressionType) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); this.shortKeyColumnCount = shortKeyColumnCount; @@ -136,6 +140,7 @@ public class CreateReplicaTask extends AgentTask { this.keysType = keysType; this.storageType = storageType; this.storageMedium = storageMedium; + this.compressionType = compressionType; this.columns = columns; @@ -267,6 +272,7 @@ public class CreateReplicaTask extends AgentTask { } createTabletReq.setTabletType(tabletType); + createTabletReq.setCompressionType(compressionType); return createTabletReq; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index 93dbf9f7ea..513b3cef1c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -138,6 +138,16 @@ public class CreateTableTest { .expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n" + "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');")); + ExceptionChecker + .expectThrowsNoException(() -> createTable("create table test.compression1(key1 int, key2 varchar(10)) \n" + + "distributed by hash(key1) buckets 1 \n" + + "properties('replication_num' = '1', 'compression' = 'lz4f');")); + + ExceptionChecker + .expectThrowsNoException(() -> createTable("create table test.compression2(key1 int, key2 varchar(10)) \n" + + "distributed by hash(key1) buckets 1 \n" + + "properties('replication_num' = '1', 'compression' = 'snappy');")); + ExceptionChecker .expectThrowsNoException(() -> createTable("create table test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n" + "unique key(k1, k2)\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index db47288484..31b84fa4af 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.thrift.TAgentTaskRequest; import org.apache.doris.thrift.TBackend; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TPushType; import org.apache.doris.thrift.TStorageMedium; @@ -110,7 +111,7 @@ public class AgentTaskTest { version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, - false, TTabletType.TABLET_TYPE_DISK); + false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index b67d9c11cd..ac7c08580b 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto"; import "olap_common.proto"; import "types.proto"; +import "segment_v2.proto"; message ZoneMap { required bytes min = 1; @@ -195,6 +196,7 @@ message TabletSchemaPB { optional int32 sequence_col_idx = 10 [default= -1]; optional SortType sort_type = 11; optional int32 sort_col_num = 12; + optional segment_v2.CompressionTypePB compression_type = 13; } enum TabletStatePB { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 1eb01e8c5f..2233fae3fe 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -84,6 +84,18 @@ struct TStorageParam { 3: optional TS3StorageParam s3_storage_param } +enum TCompressionType { + UNKNOWN_COMPRESSION = 0, + DEFAULT_COMPRESSION = 1, + NO_COMPRESSION = 2, + SNAPPY = 3, + LZ4 = 4, + LZ4F = 5, + ZLIB = 6, + ZSTD = 7 +} + + struct TCreateTabletReq { 1: required Types.TTabletId tablet_id 2: required TTabletSchema tablet_schema @@ -105,6 +117,7 @@ struct TCreateTabletReq { 13: optional TStorageFormat storage_format 14: optional TTabletType tablet_type 15: optional TStorageParam storage_param + 16: optional TCompressionType compression_type = TCompressionType.LZ4F } struct TDropTabletReq { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org