xiaokang commented on code in PR #47691:
URL: https://github.com/apache/doris/pull/47691#discussion_r1957455480


##########
extension/beats/doris/config.go:
##########
@@ -29,16 +29,17 @@ import (
 )
 
 type config struct {
-       Hosts               []string `config:"fenodes"`
-       HttpHosts           []string `config:"http_hosts"`
-       User                string   `config:"user" validate:"required"`
-       Password            string   `config:"password"`
-       Database            string   `config:"database" validate:"required"`
-       Table               string   `config:"table" validate:"required"`
-       LabelPrefix         string   `config:"label_prefix"`
-       LineDelimiter       string   `config:"line_delimiter"`
-       LogRequest          bool     `config:"log_request"`
-       LogProgressInterval int      `config:"log_progress_interval"`
+       Hosts               []string         `config:"fenodes"`
+       HttpHosts           []string         `config:"http_hosts"`
+       User                string           `config:"user" validate:"required"`
+       Password            string           `config:"password"`
+       Database            string           `config:"database" 
validate:"required"`
+       Table               string           `config:"table"`
+       Tables              []map[string]any `config:"tables"`

Review Comment:
   In elasticsearch output plugin, `tables` config is used to specify multiple 
tables to select by filters. In our case, we can simply use two config: `table` 
and `default_table`.



##########
extension/beats/doris/client.go:
##########
@@ -39,14 +41,14 @@ import (
 )
 
 type client struct {
-       url        string
+       dbURL      string

Review Comment:
   Is dbURL the same as database? If yes, just remove it.



##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)
        } else {
-               client.observer.Failed(length)
                batch.RetryEvents(rest)
-               client.logger.Warnf("Retry send %d events", length)
+               client.logger.Warnf("Retry send %d events", len(rest))
        }
        return err
 }
 
-func (client *client) publishEvents(lable string, events []publisher.Event) 
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+       Label  string
+       Events []publisher.Event
+
+       // used in publishEvents
+       serialization string
+       dropped       int64
+       request       *http.Request
+       response      *http.Response
+       err           error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events 
[]publisher.Event) map[string]*Events {
+       tableEventsMap := make(map[string]*Events)
+       if len(events) == 0 {
+               return tableEventsMap
+       }
+
+       barrier, err := getBarrierFromEvent(&events[len(events)-1])
+       if err == nil { // retry
+               if client.tableSelector.Sel.IsConst() { // table is const
+                       removeBarrierFromEvent(&events[len(events)-1])
+                       tableEventsMap[barrier.Table] = &Events{
+                               Label:  barrier.Label,
+                               Events: events,
+                       }
+               } else { // split events by barrier
+                       for end := len(events); end > 0; {
+                               barrier, _ := 
getBarrierFromEvent(&events[end-1])
+                               removeBarrierFromEvent(&events[end-1])
+                               start := end - barrier.Length
+
+                               tableEventsMap[barrier.Table] = &Events{
+                                       Label:  barrier.Label,
+                                       Events: events[start:end], // should 
not do any append to the array, because here is a slice of the original array
+                               }
+
+                               end = start
+                       }
+               }
+       } else { // first time
+               if client.tableSelector.Sel.IsConst() { // table is const
+                       table, _ := 
client.tableSelector.Sel.Select(&events[0].Content)
+                       label := client.label(table)
+                       tableEventsMap[table] = &Events{
+                               Label:  label,
+                               Events: events,
+                       }
+               } else { // select table for each event
+                       for _, e := range events {
+                               table, err := 
client.tableSelector.Sel.Select(&e.Content)
+                               if err != nil {
+                                       client.logger.Errorf("Failed to select 
table: %+v", err)
+                                       table = nilTable
+                               }
+                               _, ok := tableEventsMap[table]
+                               if !ok {
+                                       tableEventsMap[table] = &Events{
+                                               Label:  client.label(table),
+                                               Events: []publisher.Event{e},
+                                       }
+                               } else {
+                                       tableEventsMap[table].Events = 
append(tableEventsMap[table].Events, e)
+                               }
+                       }
+               }
+       }
+
+       return tableEventsMap
+}
+
+func (client *client) publishEvents(tableEventsMap map[string]*Events) 
([]publisher.Event, error) {
        begin := time.Now()
 
-       var logFirstEvent []byte
-       var stringBuilder strings.Builder
+       for table, tableEvents := range tableEventsMap {
+               events := tableEvents.Events
+
+               if table == nilTable {
+                       client.logger.Errorf("Invalid table for %v events", 
len(events))
+                       tableEvents.dropped = int64(len(events))
+                       tableEvents.err = fmt.Errorf("invalid table for %v 
events", len(events))
+                       continue
+               }
 
-       dropped := 0
-       for i := range events {
-               event := &events[i]
-               serializedEvent, err := client.codec.Encode(client.beat.Beat, 
&event.Content)
+               var stringBuilder strings.Builder
 
-               if err != nil {
-                       if event.Guaranteed() {
-                               client.logger.Errorf("Failed to serialize the 
event: %+v", err)
-                       } else {
-                               client.logger.Warnf("Failed to serialize the 
event: %+v", err)
+               for i := range events {
+                       event := &events[i]
+                       serializedEvent, err := 
client.codec.Encode(client.beat.Beat, &event.Content)
+
+                       if err != nil {
+                               if event.Guaranteed() {
+                                       client.logger.Errorf("Failed to 
serialize the event: %+v", err)
+                               } else {
+                                       client.logger.Warnf("Failed to 
serialize the event: %+v", err)
+                               }
+                               client.logger.Debugf("Failed event: %v", event)
+
+                               tableEvents.dropped++
+                               continue
                        }
-                       client.logger.Debugf("Failed event: %v", event)
 
-                       dropped++
-                       client.reporter.IncrFailedRows(1)
+                       stringBuilder.Write(serializedEvent)
+                       stringBuilder.WriteString(client.lineDelimiter)
+               }
+
+               tableEvents.serialization = stringBuilder.String()
+
+               var requestErr error
+               tableEvents.request, requestErr = 
http.NewRequest(http.MethodPut, client.url(table), 
strings.NewReader(tableEvents.serialization))
+               if requestErr != nil {
+                       client.logger.Errorf("Failed to create request: %v", 
requestErr)
                        continue
                }
 
-               if logFirstEvent == nil {
-                       logFirstEvent = serializedEvent
+               var groupCommit bool = false
+               for k, v := range client.headers {
+                       tableEvents.request.Header.Set(k, v)
+                       if k == "group_commit" && v != "off_mode" {
+                               groupCommit = true
+                       }
+               }
+               if !groupCommit {
+                       tableEvents.request.Header.Set("label", 
tableEvents.Label)
                }
-               stringBuilder.Write(serializedEvent)
-               stringBuilder.WriteString(client.lineDelimiter)
-       }
-       request, requestErr := http.NewRequest(http.MethodPut, client.url, 
strings.NewReader(stringBuilder.String()))
-       if requestErr != nil {
-               client.logger.Errorf("Failed to create request: %s", requestErr)
-               return events, requestErr
        }
 
-       var groupCommit bool = false
-       for k, v := range client.headers {
-               request.Header.Set(k, v)
-               if k == "group_commit" && v != "off_mode" {
-                       groupCommit = true
+       wg := sync.WaitGroup{}
+       for _, tableEvents := range tableEventsMap {
+               request := tableEvents.request
+               if request != nil {
+                       wg.Add(1)
+                       go func(e *Events) {
+                               e.response, e.err = 
client.httpClient.Do(request)
+                               wg.Done()
+                       }(tableEvents)

Review Comment:
   What's the argument `tableEvents` used for?



##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)

Review Comment:
   Why delete this log?



##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)
        } else {
-               client.observer.Failed(length)
                batch.RetryEvents(rest)
-               client.logger.Warnf("Retry send %d events", length)
+               client.logger.Warnf("Retry send %d events", len(rest))
        }
        return err
 }
 
-func (client *client) publishEvents(lable string, events []publisher.Event) 
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+       Label  string
+       Events []publisher.Event
+
+       // used in publishEvents
+       serialization string
+       dropped       int64
+       request       *http.Request
+       response      *http.Response
+       err           error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events 
[]publisher.Event) map[string]*Events {
+       tableEventsMap := make(map[string]*Events)
+       if len(events) == 0 {
+               return tableEventsMap
+       }
+
+       barrier, err := getBarrierFromEvent(&events[len(events)-1])
+       if err == nil { // retry
+               if client.tableSelector.Sel.IsConst() { // table is const
+                       removeBarrierFromEvent(&events[len(events)-1])
+                       tableEventsMap[barrier.Table] = &Events{
+                               Label:  barrier.Label,
+                               Events: events,
+                       }
+               } else { // split events by barrier
+                       for end := len(events); end > 0; {
+                               barrier, _ := 
getBarrierFromEvent(&events[end-1])
+                               removeBarrierFromEvent(&events[end-1])
+                               start := end - barrier.Length
+
+                               tableEventsMap[barrier.Table] = &Events{
+                                       Label:  barrier.Label,
+                                       Events: events[start:end], // should 
not do any append to the array, because here is a slice of the original array
+                               }
+
+                               end = start
+                       }
+               }
+       } else { // first time
+               if client.tableSelector.Sel.IsConst() { // table is const
+                       table, _ := 
client.tableSelector.Sel.Select(&events[0].Content)
+                       label := client.label(table)
+                       tableEventsMap[table] = &Events{
+                               Label:  label,
+                               Events: events,
+                       }
+               } else { // select table for each event
+                       for _, e := range events {
+                               table, err := 
client.tableSelector.Sel.Select(&e.Content)
+                               if err != nil {
+                                       client.logger.Errorf("Failed to select 
table: %+v", err)
+                                       table = nilTable
+                               }
+                               _, ok := tableEventsMap[table]
+                               if !ok {
+                                       tableEventsMap[table] = &Events{
+                                               Label:  client.label(table),
+                                               Events: []publisher.Event{e},
+                                       }
+                               } else {
+                                       tableEventsMap[table].Events = 
append(tableEventsMap[table].Events, e)
+                               }
+                       }
+               }
+       }
+
+       return tableEventsMap
+}
+
+func (client *client) publishEvents(tableEventsMap map[string]*Events) 
([]publisher.Event, error) {
        begin := time.Now()
 
-       var logFirstEvent []byte
-       var stringBuilder strings.Builder
+       for table, tableEvents := range tableEventsMap {
+               events := tableEvents.Events
+
+               if table == nilTable {
+                       client.logger.Errorf("Invalid table for %v events", 
len(events))
+                       tableEvents.dropped = int64(len(events))
+                       tableEvents.err = fmt.Errorf("invalid table for %v 
events", len(events))
+                       continue
+               }
 
-       dropped := 0
-       for i := range events {
-               event := &events[i]
-               serializedEvent, err := client.codec.Encode(client.beat.Beat, 
&event.Content)
+               var stringBuilder strings.Builder
 
-               if err != nil {
-                       if event.Guaranteed() {
-                               client.logger.Errorf("Failed to serialize the 
event: %+v", err)
-                       } else {
-                               client.logger.Warnf("Failed to serialize the 
event: %+v", err)
+               for i := range events {
+                       event := &events[i]
+                       serializedEvent, err := 
client.codec.Encode(client.beat.Beat, &event.Content)
+
+                       if err != nil {
+                               if event.Guaranteed() {
+                                       client.logger.Errorf("Failed to 
serialize the event: %+v", err)
+                               } else {
+                                       client.logger.Warnf("Failed to 
serialize the event: %+v", err)
+                               }
+                               client.logger.Debugf("Failed event: %v", event)
+
+                               tableEvents.dropped++
+                               continue
                        }
-                       client.logger.Debugf("Failed event: %v", event)
 
-                       dropped++
-                       client.reporter.IncrFailedRows(1)
+                       stringBuilder.Write(serializedEvent)
+                       stringBuilder.WriteString(client.lineDelimiter)
+               }
+
+               tableEvents.serialization = stringBuilder.String()
+
+               var requestErr error
+               tableEvents.request, requestErr = 
http.NewRequest(http.MethodPut, client.url(table), 
strings.NewReader(tableEvents.serialization))
+               if requestErr != nil {
+                       client.logger.Errorf("Failed to create request: %v", 
requestErr)
                        continue
                }
 
-               if logFirstEvent == nil {
-                       logFirstEvent = serializedEvent
+               var groupCommit bool = false
+               for k, v := range client.headers {
+                       tableEvents.request.Header.Set(k, v)
+                       if k == "group_commit" && v != "off_mode" {
+                               groupCommit = true
+                       }
+               }
+               if !groupCommit {
+                       tableEvents.request.Header.Set("label", 
tableEvents.Label)
                }
-               stringBuilder.Write(serializedEvent)
-               stringBuilder.WriteString(client.lineDelimiter)
-       }
-       request, requestErr := http.NewRequest(http.MethodPut, client.url, 
strings.NewReader(stringBuilder.String()))
-       if requestErr != nil {
-               client.logger.Errorf("Failed to create request: %s", requestErr)
-               return events, requestErr
        }
 
-       var groupCommit bool = false
-       for k, v := range client.headers {
-               request.Header.Set(k, v)
-               if k == "group_commit" && v != "off_mode" {
-                       groupCommit = true
+       wg := sync.WaitGroup{}
+       for _, tableEvents := range tableEventsMap {
+               request := tableEvents.request
+               if request != nil {
+                       wg.Add(1)
+                       go func(e *Events) {
+                               e.response, e.err = 
client.httpClient.Do(request)
+                               wg.Done()
+                       }(tableEvents)
                }
        }
-       if !groupCommit {
-               request.Header.Set("label", lable)
-       }
+       wg.Wait()
 
-       response, responseErr := client.httpClient.Do(request)
-       if responseErr != nil {
-               client.logger.Errorf("Failed to stream-load request: %v", 
responseErr)
-               return events, responseErr
-       }
+       for table, tableEvents := range tableEventsMap {
+               if table == nilTable {
+                       continue
+               }
 
-       defer response.Body.Close()
+               response := tableEvents.response
 
-       responseBytes, responseErr := httputil.DumpResponse(response, true)
-       if responseErr != nil {
-               client.logger.Errorf("Failed to dump doris stream load 
response: %v, error: %v", response, responseErr)
-               return events, responseErr
-       }
+               if tableEvents.err != nil {
+                       client.logger.Errorf("Failed to stream-load request: 
%v", tableEvents.err)
+                       continue
+               }
 
-       if client.logRequest {
-               client.logger.Infof("doris stream load response response:\n%s", 
string(responseBytes))
-       }
+               defer response.Body.Close()
 
-       body, bodyErr := ioutil.ReadAll(response.Body)
-       if bodyErr != nil {
-               client.logger.Errorf("Failed to read doris stream load response 
body, error: %v, response:\n%v", bodyErr, string(responseBytes))
-               return events, bodyErr
-       }
+               var responseBytes []byte
+               responseBytes, tableEvents.err = 
httputil.DumpResponse(response, true)
+               if tableEvents.err != nil {
+                       client.logger.Errorf("Failed to dump doris stream load 
response: %v, error: %v", response, tableEvents.err)
+                       continue
+               }
+
+               if client.logRequest {
+                       client.logger.Infof("doris stream load response 
response:\n%s", string(responseBytes))
+               }
+
+               var body []byte
+               body, tableEvents.err = ioutil.ReadAll(response.Body)
+               if tableEvents.err != nil {
+                       client.logger.Errorf("Failed to read doris stream load 
response body, error: %v, response:\n%v", tableEvents.err, 
string(responseBytes))
+                       continue
+               }
+
+               var status ResponseStatus
+               tableEvents.err = json.Unmarshal(body, &status)
+               if tableEvents.err != nil {
+                       client.logger.Errorf("Failed to parse doris stream load 
response to JSON, error: %v, response:\n%v", tableEvents.err, 
string(responseBytes))
+                       continue
+               }
 
-       var status ResponseStatus
-       parseErr := json.Unmarshal(body, &status)
-       if parseErr != nil {
-               client.logger.Errorf("Failed to parse doris stream load 
response to JSON, error: %v, response:\n%v", parseErr, string(responseBytes))
-               return events, parseErr
+               if status.Status != "Success" && status.Status != "Publish 
Timeout" && status.Status != "Label Already Exists" {
+                       client.logger.Errorf("doris stream load status: '%v' is 
not 'Success', full response: %v", status.Status, string(responseBytes))
+                       tableEvents.err = errors.New("doris stream load status: 
" + status.Status)
+                       continue
+               }
+
+               if status.Status == "Label Already Exists" {
+                       client.logger.Warnf("doris stream load status: '%v', %v 
events skipped", status.Status, 
int64(len(tableEvents.Events))-tableEvents.dropped)
+               }
        }
 
-       if status.Status != "Success" {
-               client.logger.Errorf("doris stream load status: '%v' is not 
'Success', full response: %v", status.Status, string(responseBytes))
-               return events, &status
+       var errs error
+       var retryEvents []publisher.Event
+       var retryRows int64 = 0
+       var droppedRows int64 = 0
+       var successRows int64 = 0
+       var successBytes int64 = 0
+
+       for table, tableEvents := range tableEventsMap {
+               if table == nilTable {
+                       errs = errors.Join(errs, tableEvents.err)
+                       droppedRows += tableEvents.dropped
+                       continue
+               }
+
+               if tableEvents.err != nil {
+                       errs = errors.Join(errs, tableEvents.err)
+                       retryRows += int64(len(tableEvents.Events))
+                       addBarrier(table, tableEvents)
+                       retryEvents = append(retryEvents, tableEvents.Events...)
+                       continue
+               }
+
+               droppedRows += tableEvents.dropped
+               successRows += int64(len(tableEvents.Events)) - 
tableEvents.dropped
+               successBytes += int64(len(tableEvents.serialization))
        }
 
-       client.logger.Debugf("Stream-Load publish events: %d events have been 
published to doris in %v.",
-               len(events)-dropped,
-               time.Now().Sub(begin))
+       client.logger.Debugf("Stream-Load publish events: %d events have been 
published to doris in %v.", successRows, time.Since(begin))
+
+       client.observer.Dropped(int(droppedRows))
+       client.observer.Acked(int(successRows))
+       client.observer.Failed(int(retryRows))
+
+       client.reporter.IncrTotalBytes(successBytes)
+       client.reporter.IncrTotalRows(successRows)
+
+       return retryEvents, errs
+}
+
+const barrierKey = "__#BARRIER#__"
 
-       client.observer.Dropped(dropped)
-       client.observer.Acked(len(events) - dropped)
+type barrierT struct {
+       Table  string `json:"table"`
+       Label  string `json:"label"`
+       Length int    `json:"length"`
+}
 
-       client.reporter.IncrTotalBytes(int64(stringBuilder.Len()))
-       client.reporter.IncrTotalRows(int64(len(events) - dropped))
+func addBarrier(table string, events *Events) {
+       events.Events[len(events.Events)-1].Content.Fields[barrierKey] = 
&barrierT{

Review Comment:
   I think it's more intuitive to add barrier field in the first event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to