gnehil opened a new pull request, #140: URL: https://github.com/apache/doris-spark-connector/pull/140
# Proposed changes ## Optimized 1. Optimize data transmission - Before optimization: After formatting the batch data, package it as a StringEntity and send it all - After optimization: HTTP Chunk method is used to transmit data during Stream Load request through InputStream, which reduces the memory consumption caused by batch data conversion when building Entity. 2. Optimize partitioned data iteration - Before optimization: Before batch splitting, the iterator.grouped method was used to group the iterators according to batch size. After grouping, a collection of batch size is obtained. At this time, all the corresponding number of records need to be read into the memory. If If the batch setting is large, the memory usage will also increase, which may easily lead to OOM. - After optimization: iterate directly through the iterator object of the partition, and implement an InputStream of Iterator<Row>. InputStream reads one row of data each time, and maintains a counter object in InputSteam. When the number of rows read is greater than or equal to the batch size , end the reading of the Input Stream, and submit the Stream Load request. In this way, the source end only needs to read the minimum batch of data during the entire iterative reading process. There is no need to cache the entire written batch of data, reducing memory usage. ## Test result Environment information - Single data size: about 8KB - Spark resource: - executor instance: 1 - exeuctor cores: 1 - executor memory: test variable - Job configuration - read - doris.batch.size = 10000 - doris.request.tablet.size = 4 - write - sink.properties.parallelism = 5 ### Test 1 - Spark Executor memory: 1GB - sink.batch.size = 100000 - Before optimization:   - After optimization:  ### Test 2 - Spark Executor memory: 1GB - sink.batch.size = 200000 - Before optimization: not performed - After optimization:  ### Test 3 - Spark Executor memory: 1GB - sink.batch.size = 500000 - Before optimization: not performed - After optimization:  ### Test 4 - Spark Executor memory: 2GB - sink.batch.size = 100000 - Before optimization:   - After optimization: Not performed ### Test 5 - Spark Executor memory: 4GB - sink.batch.size = 100000 - Before optimization:   - After optimization: Not performed ### Test 6 - Executor memory: 16GB - sink.batch.size = 100000 - Before optimization:   - After optimization: Not performed ## Test summary According to the test results, the memory usage of the optimized connector is relatively stable when the read size of each batch of the source segment remains unchanged, and the impact of the write batch size on memory usage is small, and it also reduces the time due to insufficient memory. The problem of slow data processing caused by high CPU usage caused by GC. ## Checklist(Required) 1. Does it affect the original behavior: (Yes/No/I Don't know) 4. Has unit tests been added: (Yes/No/No Need) 5. Has document been added or modified: (Yes/No/No Need) 6. Does it need to update dependencies: (Yes/No) 7. Are there any changes that cannot be rolled back: (Yes/No) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org