megarobo-caiweimin opened a new issue, #21826:
URL: https://github.com/apache/doris/issues/21826

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Version
   
   doris 1.2.4.1
   flink 1.17.1
   flink-doris-connector-1.17  1.4.0
   java 1.8
   
   ### What's Wrong?
   
   The program is used to read table from Doris and print data in console. When 
set read fields to "FID" (int type in doris table), it works well. But if 
"FPOSTDATE" (timestamp type  in doris table) is added to read fields, exception 
will be raised.
   
   only select "FID" code
   `public class DorisTableReader {
       public static void main(String[] args) {
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.setParallelism(1);
   
           DorisOptions.Builder builder = DorisOptions.builder()
                   .setFenodes("192.168.X.XXX:XXXX")
                   .setTableIdentifier("dev_db.T_AR_RECEIVEBILLENTRY")
                   .setUsername("root")
                   .setPassword("XXX");
   
           DorisReadOptions dro = DorisReadOptions.builder()
                   .setReadFields("FID")
                   .setFilterQuery("dt='2023-07-06'")
                   .build();
   
           DorisSource<List<?>> dorisSource = 
DorisSourceBuilder.<List<?>>builder()
                   .setDorisOptions(builder.build())
                   .setDorisReadOptions(dro)
                   .setDeserializer(new SimpleListDeserializationSchema())
                   .build();
   
           env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
           try {
               env.execute();
           } catch (Exception e) {
               throw new RuntimeException(e);
           }
       }
   }`
   
   when add "FPOSTDATE"
   `public class DorisTableReader {
       public static void main(String[] args) {
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.setParallelism(1);
   
           DorisOptions.Builder builder = DorisOptions.builder()
                   .setFenodes("192.168.X.XXX:XXXX")
                   .setTableIdentifier("dev_db.T_AR_RECEIVEBILLENTRY")
                   .setUsername("root")
                   .setPassword("XXX");
   
           DorisReadOptions dro = DorisReadOptions.builder()
                   .setReadFields("FID,FPOSTDATE")
                   .setFilterQuery("dt='2023-07-06'")
                   .build();
   
           DorisSource<List<?>> dorisSource = 
DorisSourceBuilder.<List<?>>builder()
                   .setDorisOptions(builder.build())
                   .setDorisReadOptions(dro)
                   .setDeserializer(new SimpleListDeserializationSchema())
                   .build();
   
           env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
           try {
               env.execute();
           } catch (Exception e) {
               throw new RuntimeException(e);
           }
       }
   }`
   
   error log below
   
   Exception in thread "main" java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at tech.megarobo.DorisTableReader.main(DorisTableReader.java:75)
   Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
        at akka.dispatch.OnComplete.internal(Future.scala:300)
        at akka.dispatch.OnComplete.internal(Future.scala:297)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
        at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
        at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
        at akka.actor.ActorCell.invoke(ActorCell.scala:547)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 4 more
   Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
   Caused by: org.apache.doris.flink.exception.ConnectedFailedException: 
Connect to Doris BE{host='192.168.X.XXX', port=9060}failed.
        at 
org.apache.doris.flink.backend.BackendClient.openScanner(BackendClient.java:143)
        at 
org.apache.doris.flink.source.reader.DorisValueReader.init(DorisValueReader.java:95)
        at 
org.apache.doris.flink.source.reader.DorisValueReader.<init>(DorisValueReader.java:88)
        at 
org.apache.doris.flink.source.reader.DorisSourceSplitReader.checkSplitOrStartNext(DorisSourceSplitReader.java:73)
        at 
org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:56)
        at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
        ... 6 more
   
   Process finished with exit code 1
   
   ### What You Expected?
   
   1. the error log can not indicate the real reason, maybe raise "error type 
fileds" or other hits?
   2. How to use flink connector read TIMESTAMP field in doris table?
   
   ### How to Reproduce?
   
   Use code below, maybe change FE node IP and doris user
   
   `import org.apache.doris.flink.cfg.DorisOptions;
   import org.apache.doris.flink.cfg.DorisReadOptions;
   import 
org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
   import org.apache.doris.flink.source.DorisSource;
   import org.apache.doris.flink.source.DorisSourceBuilder;
   import org.apache.flink.api.common.eventtime.WatermarkStrategy;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import java.util.List;
   
   public class DorisTableReader {
       public static void main(String[] args) {
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.setParallelism(1);
   
           DorisOptions.Builder builder = DorisOptions.builder()
                   .setFenodes("192.168.X.XXX:8030")
                   .setTableIdentifier("dev_db.T_AR_RECEIVEBILLENTRY")
                   .setUsername("root")
                   .setPassword("XXXX");
   
           DorisReadOptions dro = DorisReadOptions.builder()
                   .setReadFields("FID,FPOSTDATE")
                   .setFilterQuery("dt='2023-07-06'")
                   .build();
   
           DorisSource<List<?>> dorisSource = 
DorisSourceBuilder.<List<?>>builder()
                   .setDorisOptions(builder.build())
                   .setDorisReadOptions(dro)
                   .setDeserializer(new SimpleListDeserializationSchema())
                   .build();
   
           env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
           try {
               env.execute();
           } catch (Exception e) {
               throw new RuntimeException(e);
           }
       }
   }`
   
   ### Anything Else?
   
   NO, thx
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to