This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-streamloader.git


The following commit(s) were added to refs/heads/master by this push:
     new fcc6652   [Feature] Support for Custom Line Break Characters (#27)
fcc6652 is described below

commit fcc6652c7183b9b0171ff3673936bb333989fe92
Author: cfbber <cfb...@users.noreply.github.com>
AuthorDate: Wed Feb 26 22:21:34 2025 +0800

     [Feature] Support for Custom Line Break Characters (#27)
---
 main.go          | 38 +++++++++++++++++++++++++++++++++++---
 reader/reader.go | 11 ++++++++---
 2 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/main.go b/main.go
index ecff344..38d8799 100644
--- a/main.go
+++ b/main.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "math"
        "os"
+       "regexp"
        "strconv"
        "strings"
        "sync"
@@ -81,8 +82,8 @@ var (
        retryInfo            map[int]int
        showVersion          bool
        queueSize            int
-
-       bufferPool = sync.Pool{
+       lineDelimiter        byte = '\n'
+       bufferPool           = sync.Pool{
                New: func() interface{} {
                        return make([]byte, 0, bufferSize)
                },
@@ -191,6 +192,24 @@ func initFlags() {
        utils.InitLog(logLevel)
 }
 
+// Restore hex escape sequences like \xNN to their corresponding characters
+func restoreHexEscapes(s1 string) (string, error) {
+       if s1 == `\n` {
+               return "\n", nil
+       }
+
+       re := regexp.MustCompile(`\\x([0-9A-Fa-f]{2})`)
+
+       return re.ReplaceAllStringFunc(s1, func(match string) string {
+               hexValue := match[2:] // Remove the \x prefix
+               decValue, err := strconv.ParseInt(hexValue, 16, 0)
+               if err != nil {
+                       return match
+               }
+               return string(rune(decValue))
+       }), nil
+}
+
 //go:generate go run gen_version.go
 func paramCheck() {
        if showVersion {
@@ -253,6 +272,19 @@ func paramCheck() {
                        if strings.ToLower(kv[0]) == "format" && 
strings.ToLower(kv[1]) != "csv" {
                                enableConcurrency = false
                        }
+
+                       if strings.ToLower(kv[0]) == "line_delimiter" {
+
+                               restored, err := restoreHexEscapes(kv[1])
+                               if err != nil || len(restored) != 1 {
+                                       log.Errorf("line_delimiter invalid: 
%s", kv[1])
+                                       os.Exit(1)
+                               } else {
+                                       lineDelimiter = restored[0]
+                               }
+
+                       }
+
                        if len(kv) > 2 {
                                headers[kv[0]] = strings.Join(kv[1:], ":")
                        } else {
@@ -369,7 +401,7 @@ func main() {
                streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask, 
&retryInfo)
                reporter.Report()
                defer reporter.CloseWait()
-               reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, 
loadResp, retryCount)
+               reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, 
loadResp, retryCount, lineDelimiter)
                reader.Close()
 
                streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime)
diff --git a/reader/reader.go b/reader/reader.go
index 2dcd2a5..fa6563f 100644
--- a/reader/reader.go
+++ b/reader/reader.go
@@ -108,7 +108,8 @@ func NewFileReader(filePaths string, batchRows int, 
batchBytes int, bufferSize i
 }
 
 // Read File
-func (f *FileReader) Read(reporter *report.Reporter, workers int, 
maxBytesPerTask int, retryInfo *map[int]int, loadResp *loader.Resp, retryCount 
int) {
+func (f *FileReader) Read(reporter *report.Reporter, workers int, 
maxBytesPerTask int, retryInfo *map[int]int,
+       loadResp *loader.Resp, retryCount int, lineDelimiter byte) {
        index := 0
        data := f.pool.Get().([]byte)
        count := f.batchRows
@@ -125,16 +126,20 @@ func (f *FileReader) Read(reporter *report.Reporter, 
workers int, maxBytesPerTas
        for _, file := range f.files {
                loadResp.LoadFiles = append(loadResp.LoadFiles, file.Name())
                reader := bufio.NewReaderSize(file, f.bufferSize)
+
                for {
                        if atomic.LoadUint64(&reporter.FinishedWorkers) == 
atomic.LoadUint64(&reporter.TotalWorkers) {
                                return
                        }
-                       line, err := reader.ReadBytes('\n')
-                       if err == io.EOF {
+                       line, err := reader.ReadBytes(lineDelimiter)
+                       if err == io.EOF && len(line) == 0 {
                                file.Close()
                                break
                        } else if err != nil {
                                log.Errorf("Read file failed, error message: 
%v, before retrying, we suggest:\n1.Check the input data files and fix if there 
is any problem.\n2.Do select count(*) to check whether data is partially 
loaded.\n3.If the data is partially loaded and duplication is unacceptable, 
consider dropping the table (with caution that all data in the table will be 
lost) and retry.\n4.Otherwise, just retry.\n", err)
+                               if len(line) != 0 {
+                                       log.Error("5.When using a specified 
line delimiter, the file must end with that delimiter.")
+                               }
                                os.Exit(1)
                        }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to