joker-star-l commented on code in PR #47691: URL: https://github.com/apache/doris/pull/47691#discussion_r1960068923
########## extension/beats/doris/client.go: ########## @@ -180,120 +184,302 @@ func (client *client) Close() error { } func (client *client) String() string { - return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers) + return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), client.labelPrefix, client.headers) +} + +func (client *client) url(table string) string { + return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table) +} + +func (client *client) label(table string) string { + return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, table, time.Now().UnixMilli(), uuid.New()) } -func (client *client) Publish(_ context.Context, batch publisher.Batch) error { +// Publish sends events to doris. +// batch.Events() are grouped by table first (tableEvents). +// For each tableEvents, call the http stream load api to send the tableEvents to doris. +// If a tableEvents returns an error, add a barrier to the last event of the tableEvents. +// A barrier contains a table, a stream load label, and the length of the tableEvents. +// Add all failed tableEvents to the retryEvents. +// So if the last event in the batch.Events() has a barrier, it means that this is a retry. +// In this case, we will split the batch.Events() to some tableEvents by the barrier events +// and send each tableEvents to doris again reusing the label in the barrier. +func (client *client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() length := len(events) client.logger.Debugf("Received events: %d", length) - label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, client.database, client.table, time.Now().UnixMilli(), uuid.New()) - rest, err := client.publishEvents(label, events) + tableEventsMap := client.makeTableEventsMap(ctx, events) + rest, err := client.publishEvents(tableEventsMap) if len(rest) == 0 { batch.ACK() - client.logger.Debugf("Success send %d events", length) Review Comment: duplicated log of https://github.com/apache/doris/pull/47691/files#diff-86c3e882948c75fd362158ddaec93628d46975812e8db6f8b19bbe76afd5917aR443 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org