Misaki030112 opened a new issue, #503: URL: https://github.com/apache/doris-flink-connector/issues/503
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Version 24.0.1 ### What's Wrong? I use flink-doris-connector to read the full data of a table, and then write it to kafka through flink kafka sink I found that if I set the parallelism level to be greater than 1 for the Source, It will occasionally lose data, and sometimes only read a small portion of the data. The following are screenshots of my experiment PS: The number of rows of my data is 6405008  1. When I set my Source parallelism to 6, it only reads a very small portion of the data  2. When I set my Source parallelism to 2, the result is the same  3. It seems that only when the parallelism is 1 can he read the complete data. Why is this?  ### What You Expected? I expect that when the Source parallelism is greater than 1, it should be able to read the complete data. I looked at the Doris connector code carefully. I guess the process of assigning splits to read each split is fine. The problem is reading the data inside a DorisSplitRecords. It seems that it closes before reading all the tablets data in a split. ### How to Reproduce? flink version : 1.19-scala_2.12-java11 flink-doris-connector version: flink-doris-connector-1.18:24.0.1 flink task code ```java public class DorisToKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.enableCheckpointing(10000); DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder() .setDorisReadOptions( DorisReadOptions.builder() .setRequestTabletSize(50) .setRequestRetries(10) .setDeserializeArrowAsync(Boolean.TRUE) .setRequestReadTimeoutMs(60 * 1000) .setRequestBatchSize(4096) .build() ) .setDorisOptions( DorisOptions.builder() .setFenodes("doris-fe-01:8030") .setTableIdentifier("DB01.nyc_taxi_fare_data") .setUsername("root") .setPassword("") .build() ) .setDeserializer( new SimpleListDeserializationSchema() ) .build(); DataStreamSource<List<?>> dorisDataStream = env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris-source"); dorisDataStream.setParallelism(1); KafkaSink<List<?>> kafkaSink = KafkaSink.<List<?>>builder() .setBootstrapServers("kafka-broker-1:9092") .setRecordSerializer(new KafkaRecordSerializationSchema<List<?>>() { @Override public ProducerRecord<byte[], byte[]> serialize(List<?> rowData, KafkaSinkContext kafkaSinkContext, Long aLong) { try { String jsonString = null; jsonString = ObjectMapperSingleton.getInstance().writeValueAsString(rowData); byte[] value = jsonString.getBytes(); return new ProducerRecord<>("doris-kafka-topic", null, value); } catch (JsonProcessingException e) { e.printStackTrace(); throw new RuntimeException(e); } } }) .build(); dorisDataStream.sinkTo(kafkaSink).setParallelism(4); env.execute("doris-to-kafka"); } } ``` ### Anything Else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org