yangzhg opened a new issue #4051:
URL: https://github.com/apache/incubator-doris/issues/4051


   # <center>Doris supports batch delete </center>
   
   ## BACKGROUND
   
   At present, Doris supports multiple import methods such as broker load, 
routine load, stream load, etc. For the deletion of data, it can only be 
deleted through the delete statement. When the delete statement is used, a new 
version of the data will be generated every time the delete is executed. If 
frequent deletions will seriously affect query performance, and when using 
delete mode to delete, it is achieved by generating an empty rowset to record 
the deletion conditions. Each read must filter the deletion jump condition, 
also when there are many conditions Impact on performance. Compared with other 
systems, the implementation of greenplum is more like a traditional database 
product, and snowflake is implemented through merge syntax.
   
   For a scenario similar to the import of cdc data, insert and delete are 
generally interspersed in the data data. In the face of this scenario, our 
current import method cannot be satisfied, even if we can separate insert and 
delete. Problem, but still cannot solve the problem of deletion.
   
   ## Design goals
   
   ### Functional level:
   
   Enhance the import function so that it can support the following scenarios:
   
   * ~~Simple batch import, currently supported~~
   * Batch point deletion
   * Import and delete mixed data import
   
   ### Ease of use:
   
   Minimize the modification of import syntax, and be compatible with the 
current import syntax
   
   ### Performance
   
   Import and read performance should be basically the same as the current 
import method, and there should not be too much performance loss
   
   ## detailed design
   
   The import syntax is to add a column to indicate whether the current row is 
imported or deleted. If there is no default behavior to insert rows, the 
function of this level of upgrade is only implemented on segmentV2, v1 is not 
considered for the time being, in the index file of the segment file 
`IndexRegion` A bitmap index similar to null bitmap is added to mark the rows 
to be deleted.
   
   ### Data structure design
   
   A bitmap index (delete_index_page) needs to be added to the segment 
structure to indicate which row is marked for deletion. The PagePointerPB 
structure is the same as previously defined, using bitmap as the index.
   
   ```
   message SegmentFooterPB {
       optional uint32 version = 1 [default = 1]; // file version
       repeated ColumnMetaPB columns = 2; // tablet schema
       optional uint32 num_rows = 3; // number of values
       optional uint64 index_footprint = 4; // total idnex footprint of all 
columns
       optional uint64 data_footprint = 5; // total data footprint of all 
columns
       optional uint64 raw_data_footprint = 6; // raw data footprint
   
       optional CompressionTypePB compress_type = 7 [default = LZ4F]; // 
default compression type for file columns
       repeated MetadataPairPB file_meta_datas = 8; // meta data of file
   
       // Short key index's page
       optional PagePointerPB short_key_index_page = 9;,
       // Use bitmap index to indicate which row is marked for deleting
       optional PagePointerPB delete_index_page = 10;
   }
   
   ```
   
   ### Import syntax
   
   The syntax design of the import is mainly to add a column map that specifies 
the field to delete the marked column, and this column needs to be added to the 
imported data. The method of setting each import method is as follows
   
   #### stream load
   
   The writing of stream load adds a field to set the deletion mark column in 
the column field of the header, example
   `-H "columns: k1, k2, label_c3" -H "load_type: [MIXED|INSERT|DELETE]" -H 
"delete_condiction: label_c3=1"`
   
   #### broker load
   
   Set the field to delete the marked column at `PROPERTIES`
   
   ```
   LOAD LABEL db1.label1
   (
       DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
       INTO TABLE tbl1
       COLUMNS TERMINATED BY ","
       (tmp_c1,tmp_c2, label_c3)
       SET
       (
           id=tmp_c2,
           name=tmp_c1,
       )
   )
   WITH BROKER'broker'
   (
       "username"="user",
       "password"="pass"
   )
   PROPERTIES
   (
       "timeout" = "3600"
       "load_type"="[MIXED|LOAD|DELETE]",
       "delete_condition"="label=true"
       
   );
   
   ```
   
   #### reoutine load
   
   Routine load adds mapping in the `columns` field. The mapping method is the 
same as above, the example is as follows
   
   ```
      CREATE ROUTINE LOAD example_db.test1 ON example_tbl
       COLUMNS(k1, k2, k3, v1, v2, label),
       WHERE k1> 100 and k2 like "%doris%"
       PROPERTIES
       (
           "desired_concurrent_number"="3",
           "max_batch_interval" = "20",
           "max_batch_rows" = "300000",
           "max_batch_size" = "209715200",
           "strict_mode" = "false",
           "load_type"="[MIXED|LOAD|DELETE]",
       "delete_condition"="label=true"
           
       )
       FROM KAFKA
       (
           "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
           "kafka_topic" = "my_topic",
           "kafka_partitions" = "0,1,2,3",
           "kafka_offsets" = "101,0,0,200"
       );
   ```
   
   ### Import
   
   The process of data import is as follows:
   
   When the imported data contains a delete mark and the delete mark is true, 
write the data and record the line number of the secondary line in the segment, 
and record it in the delete index, otherwise write the data directly, there can 
be an optimization here when the mark is deleted The value column of this row 
can be set to the value of the most space occupied by the corresponding type. 
For example, for the varchar type, we can set the value to an empty string to 
save space.
   
   ```flow
   st=>start: Start Load
   flag_cond=>condition: delete flag is true
   write_rowdata=>operation: write data
   write_rowdata_opt=>operation: write Data with minimum values
   write_delete_index=>operation: write delete index
   e=>end
   
   st->flag_cond
   flag_cond(yes)->write_rowdata_opt->write_delete_index->e
   flag_cond(no)->write_rowdata->e
   ```
   
   Suppose there is a table
   
   ```
   +-------+-------------+------+-------+---------+-- -------+
   | Field | Type | Null | Key | Default | Extra |
   +-------+-------------+------+-------+---------+-- -------+
   | k1 | INT | Yes | true | 0 | |
   | k2 | SMALLINT | Yes | true | NULL | |
   | k3 | VARCHAR(32) | Yes | true | | |
   | v1 | BIGINT | Yes | false | 0 | REPLACE |
   +-------+-------------+------+-------+---------+-- -------+
   ```
   
   Import data is
   
   ```
   0,1,foo,3,false
   0,1,foo,2,true
   1,2,bar,3,true
   0,1,foo,5,false
   ```
   
   If the table is AGG_KEYS, and the aggregation function is sum, because the 
aggregation occurs when the data is imported, the deletion of the first two 
rows due to the second row, then the data in the first row is meaningless, but 
because of the need to record the deletion relationship , So we still need to 
record a deleted row, so if there are multiple delete insert operations for the 
same key, two records must be generated for each segment, and the data recorded 
in the data area of the rowset is
   
   ```
   0,1,foo,2
   1,2,bar,3
   0,1,foo,5
   ```
   
   delete_index is the record [1,1,0] in the bitmap, indicating that the first 
line in the batch of imported data is marked for deletion.
   
   If it is UNIQUE_KEYS, only the latest result can be recorded, so the first 
line and the second line are meaningless and will be ignored when importing the 
aggregation. Then the data recorded in the data area of the rowset is
   
   ```
   1,2,bar,3
   0,1,foo,5
   ```
   
   delete_index is the record [1,0] in the bitmap, indicating that the first 
line in the batch of imported data is marked for deletion.
   
   ### Read
   
   There are two cases when reading, agg_keys and unique_keys.
   
   #### AGG_KEYS table
   
   At present, when reading the AGG_KEYS table, the data is merged from the 
lower version to the higher version. When the current row is found with a 
delete mark, the previous data is cleared. The remaining data continues to be 
merged. If there is no data, skip reading the next key. If no delete tag is 
found, agg_update is called according to the existing process.
   
   Take the above table as an example
   
   Suppose there are two versions of rowset n and n +1
   
   The data in n is as follows delete index [0,0,0]
   
   ```
   0,1,foo,3
   1,2,bar,3
   0,1,foo,5
   ```
   
   The data in n+1 is as follows delete_index [1,1,0]
   
   ```
   0,1,foo,2
   1,2,bar,3
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to