xiaokang commented on code in PR #47691: URL: https://github.com/apache/doris/pull/47691#discussion_r1957455480
########## extension/beats/doris/config.go: ########## @@ -29,16 +29,17 @@ import ( ) type config struct { - Hosts []string `config:"fenodes"` - HttpHosts []string `config:"http_hosts"` - User string `config:"user" validate:"required"` - Password string `config:"password"` - Database string `config:"database" validate:"required"` - Table string `config:"table" validate:"required"` - LabelPrefix string `config:"label_prefix"` - LineDelimiter string `config:"line_delimiter"` - LogRequest bool `config:"log_request"` - LogProgressInterval int `config:"log_progress_interval"` + Hosts []string `config:"fenodes"` + HttpHosts []string `config:"http_hosts"` + User string `config:"user" validate:"required"` + Password string `config:"password"` + Database string `config:"database" validate:"required"` + Table string `config:"table"` + Tables []map[string]any `config:"tables"` Review Comment: In elasticsearch output plugin, `tables` config is used to specify multiple tables to select by filters. In our case, we can simply use two config: `table` and `default_table`. ########## extension/beats/doris/client.go: ########## @@ -39,14 +41,14 @@ import ( ) type client struct { - url string + dbURL string Review Comment: Is dbURL the same as database? If yes, just remove it. ########## extension/beats/doris/client.go: ########## @@ -180,120 +184,302 @@ func (client *client) Close() error { } func (client *client) String() string { - return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) +} + +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the last event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the last event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) } else { - client.observer.Failed(length) batch.RetryEvents(rest) - client.logger.Warnf("Retry send %d events", length) + client.logger.Warnf("Retry send %d events", len(rest)) } return err } -func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) { +const nilTable = "" + +type Events struct { + Label string + Events []publisher.Event + + // used in publishEvents + serialization string + dropped int64 + request *http.Request + response *http.Response + err error +} + +func (client *client) makeTableEventsMap(_ context.Context, events []publisher.Event) map[string]*Events { + tableEventsMap := make(map[string]*Events) + if len(events) == 0 { + return tableEventsMap + } + + barrier, err := getBarrierFromEvent(&events[len(events)-1]) + if err == nil { // retry + if client.tableSelector.Sel.IsConst() { // table is const + removeBarrierFromEvent(&events[len(events)-1]) + tableEventsMap[barrier.Table] = &Events{ + Label: barrier.Label, + Events: events, + } + } else { // split events by barrier + for end := len(events); end > 0; { + barrier, _ := getBarrierFromEvent(&events[end-1]) + removeBarrierFromEvent(&events[end-1]) + start := end - barrier.Length + + tableEventsMap[barrier.Table] = &Events{ + Label: barrier.Label, + Events: events[start:end], // should not do any append to the array, because here is a slice of the original array + } + + end = start + } + } + } else { // first time + if client.tableSelector.Sel.IsConst() { // table is const + table, _ := client.tableSelector.Sel.Select(&events[0].Content) + label := client.label(table) + tableEventsMap[table] = &Events{ + Label: label, + Events: events, + } + } else { // select table for each event + for _, e := range events { + table, err := client.tableSelector.Sel.Select(&e.Content) + if err != nil { + client.logger.Errorf("Failed to select table: %+v", err) + table = nilTable + } + _, ok := tableEventsMap[table] + if !ok { + tableEventsMap[table] = &Events{ + Label: client.label(table), + Events: []publisher.Event{e}, + } + } else { + tableEventsMap[table].Events = append(tableEventsMap[table].Events, e) + } + } + } + } + + return tableEventsMap +} + +func (client *client) publishEvents(tableEventsMap map[string]*Events) ([]publisher.Event, error) { begin := time.Now() - var logFirstEvent []byte - var stringBuilder strings.Builder + for table, tableEvents := range tableEventsMap { + events := tableEvents.Events + + if table == nilTable { + client.logger.Errorf("Invalid table for %v events", len(events)) + tableEvents.dropped = int64(len(events)) + tableEvents.err = fmt.Errorf("invalid table for %v events", len(events)) + continue + } - dropped := 0 - for i := range events { - event := &events[i] - serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) + var stringBuilder strings.Builder - if err != nil { - if event.Guaranteed() { - client.logger.Errorf("Failed to serialize the event: %+v", err) - } else { - client.logger.Warnf("Failed to serialize the event: %+v", err) + for i := range events { + event := &events[i] + serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) + + if err != nil { + if event.Guaranteed() { + client.logger.Errorf("Failed to serialize the event: %+v", err) + } else { + client.logger.Warnf("Failed to serialize the event: %+v", err) + } + client.logger.Debugf("Failed event: %v", event) + + tableEvents.dropped++ + continue } - client.logger.Debugf("Failed event: %v", event) - dropped++ - client.reporter.IncrFailedRows(1) + stringBuilder.Write(serializedEvent) + stringBuilder.WriteString(client.lineDelimiter) + } + + tableEvents.serialization = stringBuilder.String() + + var requestErr error + tableEvents.request, requestErr = http.NewRequest(http.MethodPut, client.url(table), strings.NewReader(tableEvents.serialization)) + if requestErr != nil { + client.logger.Errorf("Failed to create request: %v", requestErr) continue } - if logFirstEvent == nil { - logFirstEvent = serializedEvent + var groupCommit bool = false + for k, v := range client.headers { + tableEvents.request.Header.Set(k, v) + if k == "group_commit" && v != "off_mode" { + groupCommit = true + } + } + if !groupCommit { + tableEvents.request.Header.Set("label", tableEvents.Label) } - stringBuilder.Write(serializedEvent) - stringBuilder.WriteString(client.lineDelimiter) - } - request, requestErr := http.NewRequest(http.MethodPut, client.url, strings.NewReader(stringBuilder.String())) - if requestErr != nil { - client.logger.Errorf("Failed to create request: %s", requestErr) - return events, requestErr } - var groupCommit bool = false - for k, v := range client.headers { - request.Header.Set(k, v) - if k == "group_commit" && v != "off_mode" { - groupCommit = true + wg := sync.WaitGroup{} + for _, tableEvents := range tableEventsMap { + request := tableEvents.request + if request != nil { + wg.Add(1) + go func(e *Events) { + e.response, e.err = client.httpClient.Do(request) + wg.Done() + }(tableEvents) Review Comment: What's the argument `tableEvents` used for? ########## extension/beats/doris/client.go: ########## @@ -180,120 +184,302 @@ func (client *client) Close() error { } func (client *client) String() string { - return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) +} + +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the last event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the last event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) Review Comment: Why delete this log? ########## extension/beats/doris/client.go: ########## @@ -180,120 +184,302 @@ func (client *client) Close() error { } func (client *client) String() string { - return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) +} + +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the last event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the last event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) } else { - client.observer.Failed(length) batch.RetryEvents(rest) - client.logger.Warnf("Retry send %d events", length) + client.logger.Warnf("Retry send %d events", len(rest)) } return err } -func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) { +const nilTable = "" + +type Events struct { + Label string + Events []publisher.Event + + // used in publishEvents + serialization string + dropped int64 + request *http.Request + response *http.Response + err error +} + +func (client *client) makeTableEventsMap(_ context.Context, events []publisher.Event) map[string]*Events { + tableEventsMap := make(map[string]*Events) + if len(events) == 0 { + return tableEventsMap + } + + barrier, err := getBarrierFromEvent(&events[len(events)-1]) + if err == nil { // retry + if client.tableSelector.Sel.IsConst() { // table is const + removeBarrierFromEvent(&events[len(events)-1]) + tableEventsMap[barrier.Table] = &Events{ + Label: barrier.Label, + Events: events, + } + } else { // split events by barrier + for end := len(events); end > 0; { + barrier, _ := getBarrierFromEvent(&events[end-1]) + removeBarrierFromEvent(&events[end-1]) + start := end - barrier.Length + + tableEventsMap[barrier.Table] = &Events{ + Label: barrier.Label, + Events: events[start:end], // should not do any append to the array, because here is a slice of the original array + } + + end = start + } + } + } else { // first time + if client.tableSelector.Sel.IsConst() { // table is const + table, _ := client.tableSelector.Sel.Select(&events[0].Content) + label := client.label(table) + tableEventsMap[table] = &Events{ + Label: label, + Events: events, + } + } else { // select table for each event + for _, e := range events { + table, err := client.tableSelector.Sel.Select(&e.Content) + if err != nil { + client.logger.Errorf("Failed to select table: %+v", err) + table = nilTable + } + _, ok := tableEventsMap[table] + if !ok { + tableEventsMap[table] = &Events{ + Label: client.label(table), + Events: []publisher.Event{e}, + } + } else { + tableEventsMap[table].Events = append(tableEventsMap[table].Events, e) + } + } + } + } + + return tableEventsMap +} + +func (client *client) publishEvents(tableEventsMap map[string]*Events) ([]publisher.Event, error) { begin := time.Now() - var logFirstEvent []byte - var stringBuilder strings.Builder + for table, tableEvents := range tableEventsMap { + events := tableEvents.Events + + if table == nilTable { + client.logger.Errorf("Invalid table for %v events", len(events)) + tableEvents.dropped = int64(len(events)) + tableEvents.err = fmt.Errorf("invalid table for %v events", len(events)) + continue + } - dropped := 0 - for i := range events { - event := &events[i] - serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) + var stringBuilder strings.Builder - if err != nil { - if event.Guaranteed() { - client.logger.Errorf("Failed to serialize the event: %+v", err) - } else { - client.logger.Warnf("Failed to serialize the event: %+v", err) + for i := range events { + event := &events[i] + serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content) + + if err != nil { + if event.Guaranteed() { + client.logger.Errorf("Failed to serialize the event: %+v", err) + } else { + client.logger.Warnf("Failed to serialize the event: %+v", err) + } + client.logger.Debugf("Failed event: %v", event) + + tableEvents.dropped++ + continue } - client.logger.Debugf("Failed event: %v", event) - dropped++ - client.reporter.IncrFailedRows(1) + stringBuilder.Write(serializedEvent) + stringBuilder.WriteString(client.lineDelimiter) + } + + tableEvents.serialization = stringBuilder.String() + + var requestErr error + tableEvents.request, requestErr = http.NewRequest(http.MethodPut, client.url(table), strings.NewReader(tableEvents.serialization)) + if requestErr != nil { + client.logger.Errorf("Failed to create request: %v", requestErr) continue } - if logFirstEvent == nil { - logFirstEvent = serializedEvent + var groupCommit bool = false + for k, v := range client.headers { + tableEvents.request.Header.Set(k, v) + if k == "group_commit" && v != "off_mode" { + groupCommit = true + } + } + if !groupCommit { + tableEvents.request.Header.Set("label", tableEvents.Label) } - stringBuilder.Write(serializedEvent) - stringBuilder.WriteString(client.lineDelimiter) - } - request, requestErr := http.NewRequest(http.MethodPut, client.url, strings.NewReader(stringBuilder.String())) - if requestErr != nil { - client.logger.Errorf("Failed to create request: %s", requestErr) - return events, requestErr } - var groupCommit bool = false - for k, v := range client.headers { - request.Header.Set(k, v) - if k == "group_commit" && v != "off_mode" { - groupCommit = true + wg := sync.WaitGroup{} + for _, tableEvents := range tableEventsMap { + request := tableEvents.request + if request != nil { + wg.Add(1) + go func(e *Events) { + e.response, e.err = client.httpClient.Do(request) + wg.Done() + }(tableEvents) } } - if !groupCommit { - request.Header.Set("label", lable) - } + wg.Wait() - response, responseErr := client.httpClient.Do(request) - if responseErr != nil { - client.logger.Errorf("Failed to stream-load request: %v", responseErr) - return events, responseErr - } + for table, tableEvents := range tableEventsMap { + if table == nilTable { + continue + } - defer response.Body.Close() + response := tableEvents.response - responseBytes, responseErr := httputil.DumpResponse(response, true) - if responseErr != nil { - client.logger.Errorf("Failed to dump doris stream load response: %v, error: %v", response, responseErr) - return events, responseErr - } + if tableEvents.err != nil { + client.logger.Errorf("Failed to stream-load request: %v", tableEvents.err) + continue + } - if client.logRequest { - client.logger.Infof("doris stream load response response:\n%s", string(responseBytes)) - } + defer response.Body.Close() - body, bodyErr := ioutil.ReadAll(response.Body) - if bodyErr != nil { - client.logger.Errorf("Failed to read doris stream load response body, error: %v, response:\n%v", bodyErr, string(responseBytes)) - return events, bodyErr - } + var responseBytes []byte + responseBytes, tableEvents.err = httputil.DumpResponse(response, true) + if tableEvents.err != nil { + client.logger.Errorf("Failed to dump doris stream load response: %v, error: %v", response, tableEvents.err) + continue + } + + if client.logRequest { + client.logger.Infof("doris stream load response response:\n%s", string(responseBytes)) + } + + var body []byte + body, tableEvents.err = ioutil.ReadAll(response.Body) + if tableEvents.err != nil { + client.logger.Errorf("Failed to read doris stream load response body, error: %v, response:\n%v", tableEvents.err, string(responseBytes)) + continue + } + + var status ResponseStatus + tableEvents.err = json.Unmarshal(body, &status) + if tableEvents.err != nil { + client.logger.Errorf("Failed to parse doris stream load response to JSON, error: %v, response:\n%v", tableEvents.err, string(responseBytes)) + continue + } - var status ResponseStatus - parseErr := json.Unmarshal(body, &status) - if parseErr != nil { - client.logger.Errorf("Failed to parse doris stream load response to JSON, error: %v, response:\n%v", parseErr, string(responseBytes)) - return events, parseErr + if status.Status != "Success" && status.Status != "Publish Timeout" && status.Status != "Label Already Exists" { + client.logger.Errorf("doris stream load status: '%v' is not 'Success', full response: %v", status.Status, string(responseBytes)) + tableEvents.err = errors.New("doris stream load status: " + status.Status) + continue + } + + if status.Status == "Label Already Exists" { + client.logger.Warnf("doris stream load status: '%v', %v events skipped", status.Status, int64(len(tableEvents.Events))-tableEvents.dropped) + } } - if status.Status != "Success" { - client.logger.Errorf("doris stream load status: '%v' is not 'Success', full response: %v", status.Status, string(responseBytes)) - return events, &status + var errs error + var retryEvents []publisher.Event + var retryRows int64 = 0 + var droppedRows int64 = 0 + var successRows int64 = 0 + var successBytes int64 = 0 + + for table, tableEvents := range tableEventsMap { + if table == nilTable { + errs = errors.Join(errs, tableEvents.err) + droppedRows += tableEvents.dropped + continue + } + + if tableEvents.err != nil { + errs = errors.Join(errs, tableEvents.err) + retryRows += int64(len(tableEvents.Events)) + addBarrier(table, tableEvents) + retryEvents = append(retryEvents, tableEvents.Events...) + continue + } + + droppedRows += tableEvents.dropped + successRows += int64(len(tableEvents.Events)) - tableEvents.dropped + successBytes += int64(len(tableEvents.serialization)) } - client.logger.Debugf("Stream-Load publish events: %d events have been published to doris in %v.", - len(events)-dropped, - time.Now().Sub(begin)) + client.logger.Debugf("Stream-Load publish events: %d events have been published to doris in %v.", successRows, time.Since(begin)) + + client.observer.Dropped(int(droppedRows)) + client.observer.Acked(int(successRows)) + client.observer.Failed(int(retryRows)) + + client.reporter.IncrTotalBytes(successBytes) + client.reporter.IncrTotalRows(successRows) + + return retryEvents, errs +} + +const barrierKey = "__#BARRIER#__" - client.observer.Dropped(dropped) - client.observer.Acked(len(events) - dropped) +type barrierT struct { + Table string `json:"table"` + Label string `json:"label"` + Length int `json:"length"` +} - client.reporter.IncrTotalBytes(int64(stringBuilder.Len())) - client.reporter.IncrTotalRows(int64(len(events) - dropped)) +func addBarrier(table string, events *Events) { + events.Events[len(events.Events)-1].Content.Fields[barrierKey] = &barrierT{ Review Comment: I think it's more intuitive to add barrier field in the first event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org