joker-star-l commented on code in PR #47691:
URL: https://github.com/apache/doris/pull/47691#discussion_r1960068923


##########
extension/beats/doris/client.go:
##########
@@ -180,120 +184,302 @@ func (client *client) Close() error {
 }
 
 func (client *client) String() string {
-       return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, 
client.headers)
+       return fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"), 
client.labelPrefix, client.headers)
+}
+
+func (client *client) url(table string) string {
+       return fmt.Sprintf("%s/%s/_stream_load", client.dbURL, table)
+}
+
+func (client *client) label(table string) string {
+       return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, table, time.Now().UnixMilli(), uuid.New())
 }
 
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents 
to doris.
+// If a tableEvents returns an error, add a barrier to the last event of the 
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the 
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the last event in the batch.Events() has a barrier, it means that 
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the 
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch) 
error {
        events := batch.Events()
        length := len(events)
        client.logger.Debugf("Received events: %d", length)
 
-       label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix, 
client.database, client.table, time.Now().UnixMilli(), uuid.New())
-       rest, err := client.publishEvents(label, events)
+       tableEventsMap := client.makeTableEventsMap(ctx, events)
+       rest, err := client.publishEvents(tableEventsMap)
 
        if len(rest) == 0 {
                batch.ACK()
-               client.logger.Debugf("Success send %d events", length)

Review Comment:
   duplicated log of 
https://github.com/apache/doris/pull/47691/files#diff-86c3e882948c75fd362158ddaec93628d46975812e8db6f8b19bbe76afd5917aR443



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to