yangzhg opened a new issue #4051: URL: https://github.com/apache/incubator-doris/issues/4051
# <center>Doris supports batch delete </center> ## BACKGROUND At present, Doris supports multiple import methods such as broker load, routine load, stream load, etc. For the deletion of data, it can only be deleted through the delete statement. When the delete statement is used, a new version of the data will be generated every time the delete is executed. If frequent deletions will seriously affect query performance, and when using delete mode to delete, it is achieved by generating an empty rowset to record the deletion conditions. Each read must filter the deletion jump condition, also when there are many conditions Impact on performance. Compared with other systems, the implementation of greenplum is more like a traditional database product, and snowflake is implemented through merge syntax. For a scenario similar to the import of cdc data, insert and delete are generally interspersed in the data data. In the face of this scenario, our current import method cannot be satisfied, even if we can separate insert and delete. Problem, but still cannot solve the problem of deletion. ## Design goals ### Functional level: Enhance the import function so that it can support the following scenarios: * ~~Simple batch import, currently supported~~ * Batch point deletion * Import and delete mixed data import ### Ease of use: Minimize the modification of import syntax, and be compatible with the current import syntax ### Performance Import and read performance should be basically the same as the current import method, and there should not be too much performance loss ## detailed design The import syntax is to add a column to indicate whether the current row is imported or deleted. If there is no default behavior to insert rows, the function of this level of upgrade is only implemented on segmentV2, v1 is not considered for the time being, in the index file of the segment file `IndexRegion` A bitmap index similar to null bitmap is added to mark the rows to be deleted. ### Data structure design A bitmap index (delete_index_page) needs to be added to the segment structure to indicate which row is marked for deletion. The PagePointerPB structure is the same as previously defined, using bitmap as the index. ``` message SegmentFooterPB { optional uint32 version = 1 [default = 1]; // file version repeated ColumnMetaPB columns = 2; // tablet schema optional uint32 num_rows = 3; // number of values optional uint64 index_footprint = 4; // total idnex footprint of all columns optional uint64 data_footprint = 5; // total data footprint of all columns optional uint64 raw_data_footprint = 6; // raw data footprint optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns repeated MetadataPairPB file_meta_datas = 8; // meta data of file // Short key index's page optional PagePointerPB short_key_index_page = 9;, // Use bitmap index to indicate which row is marked for deleting optional PagePointerPB delete_index_page = 10; } ``` ### Import syntax The syntax design of the import is mainly to add a column map that specifies the field to delete the marked column, and this column needs to be added to the imported data. The method of setting each import method is as follows #### stream load The writing of stream load adds a field to set the deletion mark column in the column field of the header, example `-H "columns: k1, k2, label_c3" -H "load_type: [MIXED|INSERT|DELETE]" -H "delete_condiction: label_c3=1"` #### broker load Set the field to delete the marked column at `PROPERTIES` ``` LOAD LABEL db1.label1 ( DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1,tmp_c2, label_c3) SET ( id=tmp_c2, name=tmp_c1, ) ) WITH BROKER'broker' ( "username"="user", "password"="pass" ) PROPERTIES ( "timeout" = "3600" "load_type"="[MIXED|LOAD|DELETE]", "delete_condition"="label=true" ); ``` #### reoutine load Routine load adds mapping in the `columns` field. The mapping method is the same as above, the example is as follows ``` CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, label), WHERE k1> 100 and k2 like "%doris%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "load_type"="[MIXED|LOAD|DELETE]", "delete_condition"="label=true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" ); ``` ### Import The process of data import is as follows: When the imported data contains a delete mark and the delete mark is true, write the data and record the line number of the secondary line in the segment, and record it in the delete index, otherwise write the data directly, there can be an optimization here when the mark is deleted The value column of this row can be set to the value of the most space occupied by the corresponding type. For example, for the varchar type, we can set the value to an empty string to save space. ```flow st=>start: Start Load flag_cond=>condition: delete flag is true write_rowdata=>operation: write data write_rowdata_opt=>operation: write Data with minimum values write_delete_index=>operation: write delete index e=>end st->flag_cond flag_cond(yes)->write_rowdata_opt->write_delete_index->e flag_cond(no)->write_rowdata->e ``` Suppose there is a table ``` +-------+-------------+------+-------+---------+-- -------+ | Field | Type | Null | Key | Default | Extra | +-------+-------------+------+-------+---------+-- -------+ | k1 | INT | Yes | true | 0 | | | k2 | SMALLINT | Yes | true | NULL | | | k3 | VARCHAR(32) | Yes | true | | | | v1 | BIGINT | Yes | false | 0 | REPLACE | +-------+-------------+------+-------+---------+-- -------+ ``` Import data is ``` 0,1,foo,3,false 0,1,foo,2,true 1,2,bar,3,true 0,1,foo,5,false ``` If the table is AGG_KEYS, and the aggregation function is sum, because the aggregation occurs when the data is imported, the deletion of the first two rows due to the second row, then the data in the first row is meaningless, but because of the need to record the deletion relationship , So we still need to record a deleted row, so if there are multiple delete insert operations for the same key, two records must be generated for each segment, and the data recorded in the data area of the rowset is ``` 0,1,foo,2 1,2,bar,3 0,1,foo,5 ``` delete_index is the record [1,1,0] in the bitmap, indicating that the first line in the batch of imported data is marked for deletion. If it is UNIQUE_KEYS, only the latest result can be recorded, so the first line and the second line are meaningless and will be ignored when importing the aggregation. Then the data recorded in the data area of the rowset is ``` 1,2,bar,3 0,1,foo,5 ``` delete_index is the record [1,0] in the bitmap, indicating that the first line in the batch of imported data is marked for deletion. ### Read There are two cases when reading, agg_keys and unique_keys. #### AGG_KEYS table At present, when reading the AGG_KEYS table, the data is merged from the lower version to the higher version. When the current row is found with a delete mark, the previous data is cleared. The remaining data continues to be merged. If there is no data, skip reading the next key. If no delete tag is found, agg_update is called according to the existing process. Take the above table as an example Suppose there are two versions of rowset n and n +1 The data in n is as follows delete index [0,0,0] ``` 0,1,foo,3 1,2,bar,3 0,1,foo,5 ``` The data in n+1 is as follows delete_index [1,1,0] ``` 0,1,foo,2 1,2,bar,3 ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org