This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-streamloader.git
The following commit(s) were added to refs/heads/master by this push: new fcc6652 [Feature] Support for Custom Line Break Characters (#27) fcc6652 is described below commit fcc6652c7183b9b0171ff3673936bb333989fe92 Author: cfbber <cfb...@users.noreply.github.com> AuthorDate: Wed Feb 26 22:21:34 2025 +0800 [Feature] Support for Custom Line Break Characters (#27) --- main.go | 38 +++++++++++++++++++++++++++++++++++--- reader/reader.go | 11 ++++++++--- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index ecff344..38d8799 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "os" + "regexp" "strconv" "strings" "sync" @@ -81,8 +82,8 @@ var ( retryInfo map[int]int showVersion bool queueSize int - - bufferPool = sync.Pool{ + lineDelimiter byte = '\n' + bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 0, bufferSize) }, @@ -191,6 +192,24 @@ func initFlags() { utils.InitLog(logLevel) } +// Restore hex escape sequences like \xNN to their corresponding characters +func restoreHexEscapes(s1 string) (string, error) { + if s1 == `\n` { + return "\n", nil + } + + re := regexp.MustCompile(`\\x([0-9A-Fa-f]{2})`) + + return re.ReplaceAllStringFunc(s1, func(match string) string { + hexValue := match[2:] // Remove the \x prefix + decValue, err := strconv.ParseInt(hexValue, 16, 0) + if err != nil { + return match + } + return string(rune(decValue)) + }), nil +} + //go:generate go run gen_version.go func paramCheck() { if showVersion { @@ -253,6 +272,19 @@ func paramCheck() { if strings.ToLower(kv[0]) == "format" && strings.ToLower(kv[1]) != "csv" { enableConcurrency = false } + + if strings.ToLower(kv[0]) == "line_delimiter" { + + restored, err := restoreHexEscapes(kv[1]) + if err != nil || len(restored) != 1 { + log.Errorf("line_delimiter invalid: %s", kv[1]) + os.Exit(1) + } else { + lineDelimiter = restored[0] + } + + } + if len(kv) > 2 { headers[kv[0]] = strings.Join(kv[1:], ":") } else { @@ -369,7 +401,7 @@ func main() { streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask, &retryInfo) reporter.Report() defer reporter.CloseWait() - reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount) + reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount, lineDelimiter) reader.Close() streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime) diff --git a/reader/reader.go b/reader/reader.go index 2dcd2a5..fa6563f 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -108,7 +108,8 @@ func NewFileReader(filePaths string, batchRows int, batchBytes int, bufferSize i } // Read File -func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTask int, retryInfo *map[int]int, loadResp *loader.Resp, retryCount int) { +func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTask int, retryInfo *map[int]int, + loadResp *loader.Resp, retryCount int, lineDelimiter byte) { index := 0 data := f.pool.Get().([]byte) count := f.batchRows @@ -125,16 +126,20 @@ func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTas for _, file := range f.files { loadResp.LoadFiles = append(loadResp.LoadFiles, file.Name()) reader := bufio.NewReaderSize(file, f.bufferSize) + for { if atomic.LoadUint64(&reporter.FinishedWorkers) == atomic.LoadUint64(&reporter.TotalWorkers) { return } - line, err := reader.ReadBytes('\n') - if err == io.EOF { + line, err := reader.ReadBytes(lineDelimiter) + if err == io.EOF && len(line) == 0 { file.Close() break } else if err != nil { log.Errorf("Read file failed, error message: %v, before retrying, we suggest:\n1.Check the input data files and fix if there is any problem.\n2.Do select count(*) to check whether data is partially loaded.\n3.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n4.Otherwise, just retry.\n", err) + if len(line) != 0 { + log.Error("5.When using a specified line delimiter, the file must end with that delimiter.") + } os.Exit(1) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org