Misaki030112 opened a new issue, #503:
URL: https://github.com/apache/doris-flink-connector/issues/503

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### Version
   
   24.0.1
   
   ### What's Wrong?
   
   I use flink-doris-connector to read the full data of a table, and then write 
it to kafka through flink kafka sink
   
   I found that if I set the parallelism level to be greater than 1 for the 
Source, It will occasionally lose data, and sometimes only read a small portion 
of the data.
   
   The following are screenshots of my experiment
   PS:  The number of rows of my data is 6405008
   
![image](https://github.com/user-attachments/assets/92012402-3ead-45d2-b4ce-8cd6cee2d58f)
   
   1. When I set my Source parallelism to 6, it only reads a very small portion 
of the data
   
![image](https://github.com/user-attachments/assets/7e6d42e9-4926-4cb9-b535-bd1e1c4ac4a5)
   
   2. When I set my Source parallelism to 2, the result is the same
   
![image](https://github.com/user-attachments/assets/98ee9eed-a246-449b-bde8-0cc7a47134a0)
   
   3. It seems that only when the parallelism is 1 can he read the complete 
data. Why is this?
   
![image](https://github.com/user-attachments/assets/50eae649-70d4-4d56-80e5-e8c2379da827)
   
   
   
   ### What You Expected?
   
   I expect that when the Source parallelism is greater than 1, it should be 
able to read the complete data.
   
   I looked at the Doris connector code carefully. I guess the process of 
assigning splits to read each split is fine. The problem is reading the data 
inside a DorisSplitRecords. It seems that it closes before reading all the 
tablets data in a split.
   
   ### How to Reproduce?
   
   flink version :    1.19-scala_2.12-java11
   flink-doris-connector version:  flink-doris-connector-1.18:24.0.1
   
   flink task code
   ```java
   public class DorisToKafka {
   
       public static void main(String[] args) throws Exception {
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.setRuntimeMode(RuntimeExecutionMode.BATCH);
           env.enableCheckpointing(10000);
   
   
           DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
                   .setDorisReadOptions(
                           DorisReadOptions.builder()
                                   .setRequestTabletSize(50)
                                   .setRequestRetries(10)
                                   .setDeserializeArrowAsync(Boolean.TRUE)
                                   .setRequestReadTimeoutMs(60 * 1000)
                                   .setRequestBatchSize(4096)
                                   .build()
                   )
                   .setDorisOptions(
                           DorisOptions.builder()
                                   .setFenodes("doris-fe-01:8030")
                                   
.setTableIdentifier("DB01.nyc_taxi_fare_data")
                                   .setUsername("root")
                                   .setPassword("")
                                   .build()
                   )
                   .setDeserializer(
                           new SimpleListDeserializationSchema()
                   )
                   .build();
   
           DataStreamSource<List<?>> dorisDataStream = 
env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris-source");
           dorisDataStream.setParallelism(1);
   
           KafkaSink<List<?>> kafkaSink = KafkaSink.<List<?>>builder()
                   .setBootstrapServers("kafka-broker-1:9092")
                   .setRecordSerializer(new 
KafkaRecordSerializationSchema<List<?>>() {
                       @Override
                       public ProducerRecord<byte[], byte[]> serialize(List<?> 
rowData, KafkaSinkContext kafkaSinkContext, Long aLong) {
                           try {
                               String jsonString = null;
                               jsonString = 
ObjectMapperSingleton.getInstance().writeValueAsString(rowData);
                               byte[] value = jsonString.getBytes();
                               return new ProducerRecord<>("doris-kafka-topic", 
null, value);
                           } catch (JsonProcessingException e) {
                               e.printStackTrace();
                               throw new RuntimeException(e);
                           }
                       }
                   })
                   .build();
   
   
           dorisDataStream.sinkTo(kafkaSink).setParallelism(4);
           env.execute("doris-to-kafka");
       }
   }
   ```
   
   
   
   ### Anything Else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to