megarobo-caiweimin opened a new issue, #21826: URL: https://github.com/apache/doris/issues/21826
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Version doris 1.2.4.1 flink 1.17.1 flink-doris-connector-1.17 1.4.0 java 1.8 ### What's Wrong? The program is used to read table from Doris and print data in console. When set read fields to "FID" (int type in doris table), it works well. But if "FPOSTDATE" (timestamp type in doris table) is added to read fields, exception will be raised. only select "FID" code `public class DorisTableReader { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DorisOptions.Builder builder = DorisOptions.builder() .setFenodes("192.168.X.XXX:XXXX") .setTableIdentifier("dev_db.T_AR_RECEIVEBILLENTRY") .setUsername("root") .setPassword("XXX"); DorisReadOptions dro = DorisReadOptions.builder() .setReadFields("FID") .setFilterQuery("dt='2023-07-06'") .build(); DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() .setDorisOptions(builder.build()) .setDorisReadOptions(dro) .setDeserializer(new SimpleListDeserializationSchema()) .build(); env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); try { env.execute(); } catch (Exception e) { throw new RuntimeException(e); } } }` when add "FPOSTDATE" `public class DorisTableReader { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DorisOptions.Builder builder = DorisOptions.builder() .setFenodes("192.168.X.XXX:XXXX") .setTableIdentifier("dev_db.T_AR_RECEIVEBILLENTRY") .setUsername("root") .setPassword("XXX"); DorisReadOptions dro = DorisReadOptions.builder() .setReadFields("FID,FPOSTDATE") .setFilterQuery("dt='2023-07-06'") .build(); DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() .setDorisOptions(builder.build()) .setDorisReadOptions(dro) .setDeserializer(new SimpleListDeserializationSchema()) .build(); env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); try { env.execute(); } catch (Exception e) { throw new RuntimeException(e); } } }` error log below Exception in thread "main" java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at tech.megarobo.DorisTableReader.main(DorisTableReader.java:75) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) at akka.actor.ActorCell.invoke(ActorCell.scala:547) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.doris.flink.exception.ConnectedFailedException: Connect to Doris BE{host='192.168.X.XXX', port=9060}failed. at org.apache.doris.flink.backend.BackendClient.openScanner(BackendClient.java:143) at org.apache.doris.flink.source.reader.DorisValueReader.init(DorisValueReader.java:95) at org.apache.doris.flink.source.reader.DorisValueReader.<init>(DorisValueReader.java:88) at org.apache.doris.flink.source.reader.DorisSourceSplitReader.checkSplitOrStartNext(DorisSourceSplitReader.java:73) at org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:56) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ... 6 more Process finished with exit code 1 ### What You Expected? 1. the error log can not indicate the real reason, maybe raise "error type fileds" or other hits? 2. How to use flink connector read TIMESTAMP field in doris table? ### How to Reproduce? Use code below, maybe change FE node IP and doris user `import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; import org.apache.doris.flink.source.DorisSource; import org.apache.doris.flink.source.DorisSourceBuilder; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.List; public class DorisTableReader { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DorisOptions.Builder builder = DorisOptions.builder() .setFenodes("192.168.X.XXX:8030") .setTableIdentifier("dev_db.T_AR_RECEIVEBILLENTRY") .setUsername("root") .setPassword("XXXX"); DorisReadOptions dro = DorisReadOptions.builder() .setReadFields("FID,FPOSTDATE") .setFilterQuery("dt='2023-07-06'") .build(); DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() .setDorisOptions(builder.build()) .setDorisReadOptions(dro) .setDeserializer(new SimpleListDeserializationSchema()) .build(); env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); try { env.execute(); } catch (Exception e) { throw new RuntimeException(e); } } }` ### Anything Else? NO, thx ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org