imx7 opened a new issue, #21832:
URL: https://github.com/apache/doris/issues/21832

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Version
   
   Apache Doris 2.0-beta (Latest)
   "org.apache.doris" % "flink-doris-connector-1.17" % "1.4.0" 
   Apache Flink 1.17.1
   
   ### What's Wrong?
   
   flink通过streamload的方式写入MAP<K,V>类型的数据,当K为Timestamp的类型时会写入失败。
   insert into方式正常。
   alpha版本正常。
   
   
   ### What You Expected?
   
   修复此BUG
   
   ### How to Reproduce?
   
   
   import com.fasterxml.jackson.databind.ObjectMapper
   import org.apache.doris.flink.cfg.{DorisExecutionOptions, DorisOptions, 
DorisReadOptions}
   import org.apache.doris.flink.sink.DorisSink
   import org.apache.doris.flink.sink.writer.RowDataSerializer
   import org.apache.flink.api.common.accumulators.LongCounter
   import org.apache.flink.api.common.eventtime.WatermarkStrategy
   import org.apache.flink.api.common.restartstrategy.RestartStrategies
   import org.apache.flink.api.common.serialization.SimpleStringSchema
   import org.apache.flink.api.java.utils.ParameterTool
   import org.apache.flink.configuration._
   import org.apache.flink.connector.kafka.source.KafkaSource
   import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
   import org.apache.flink.streaming.api.functions.ProcessFunction
   import org.apache.flink.streaming.api.scala._
   import org.apache.flink.table.api.DataTypes
   import org.apache.flink.table.data.{GenericMapData, GenericRowData, RowData, 
StringData, TimestampData}
   import org.apache.flink.table.types.DataType
   import org.apache.flink.util.Collector
   
   import java.text.SimpleDateFormat
   import java.time.LocalDate
   import java.time.format.DateTimeFormatter
   import java.util.{Locale, Properties, UUID}
   import java.util.regex.Pattern
   import scala.util.Try
   import scala.collection.JavaConverters._
   
   import java.util
   
   
   
   
   object StreamLoadApp {
     def main(args: Array[String]): Unit = {
       /**
        * 参数配置
        */
       /**kafka*/
       val params = ParameterTool.fromArgs(args)
       val bootstrap=params.get("bootstrap")
       val groupId=params.get("group_id")
       val topic=params.get("topic")
       val offset=params.get("offset").toLowerCase match {
         case "earliest"=>OffsetsInitializer.earliest()
         case _ =>OffsetsInitializer.latest()
       }
   
       /**doris**/
       val fe=params.get("fe")
       val username=params.get("username")
       val password=params.get("password")
       val table_name=params.get("table_name")
   
   
       /**
        * 环境初始化
        * 设置log
        */
       val config = new org.apache.flink.configuration.Configuration()
       config.setString(RestOptions.BIND_PORT, "6062-6092")
           val env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
   //    val env = StreamExecutionEnvironment.getExecutionEnvironment
       env.enableCheckpointing(8000)
       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(999999, 
32*1000L))
   
   
       val logSource =KafkaSource
         .builder[String]()
         .setBootstrapServers(bootstrap)
         .setGroupId(groupId)
         .setTopicPattern(Pattern.compile(topic))
         .setValueOnlyDeserializer(new SimpleStringSchema())
         .setStartingOffsets(offset)
         .setProperty("partition.discovery.interval.ms", "10000")
         .setProperty("metadata.max.age.ms", "1000")
         .setProperty("max,request.size", "10000000")
         .setProperty("session.timeout.ms","600000")
         .setProperty("request.timeout.ms","600000")
         .setProperty("enable.auto.commit","true")//add at 2023-0419
         .setProperty("auto.commit.interval.ms","512000")//add at 2023-0419
         .build()
       val kafkaStream = 
env.fromSource(logSource,WatermarkStrategy.noWatermarks(),"KafkaSourceLog")
   
       val logInfoStream = kafkaStream.process(
         new ProcessFunction[String,RowData]() {
           val dateFormat  = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss",Locale.CHINA)
   
           val logCount = new LongCounter()
           override def open(parameters: Configuration): Unit = {
             super.open(parameters)
             getRuntimeContext.addAccumulator("log-count",logCount)
           }
   
           override def processElement(value: String, ctx: 
ProcessFunction[String, RowData]#Context, out: Collector[RowData]): Unit = {
             val mapper = new ObjectMapper()
             try {
               val root = mapper.readTree(value)
               val dataCode = root.path("dataCode").asText
               val sensorCode = Try{
                 root.path("sensorId").asText
               }.getOrElse(root.path("deviceId").asText)
               val postTime = root.path("postTime").asText
   
   
               val p_date= LocalDate.parse(postTime, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toEpochDay.toInt
               val data_code=StringData.fromString(dataCode)
               val sensor_code=StringData.fromString(sensorCode)
               val post_time_tmp=new 
java.sql.Timestamp(dateFormat.parse(postTime).getTime)
               val post_time=TimestampData.fromTimestamp(post_time_tmp)
   //            val test_col_1=Map(post_time_tmp.asInstanceOf[AnyRef] 
->value.length.asInstanceOf[AnyRef])
               val test_col_1=Map(post_time_tmp->value.length)
               val test_col_2=Map(post_time_tmp.getTime.toString ->value.length)
   
   
               val genericRowData = new GenericRowData(7)
               genericRowData.setField(0,p_date)
               genericRowData.setField(1,data_code)
               genericRowData.setField(2,sensor_code)
               genericRowData.setField(3,post_time)
               /***下面这一行执行失败*/
               genericRowData.setField(4,new GenericMapData(test_col_1.asJava))
               genericRowData.setField(5,new GenericMapData(test_col_2.asJava))
               genericRowData.setField(6,0)
   
               out.collect(genericRowData)
               /** 计数加1     * */
               logCount.add(1)
             } catch{
               case e: Exception =>
                 println(s"exception-time: 
${dateFormat.format(System.currentTimeMillis())}\n")
                 println(value)
                 e.printStackTrace()
             }
   
           }
         }
       ).name("log-process")
   
   
       val builder = DorisSink.builder[RowData]
       val dorisBuilder = DorisOptions.builder
       dorisBuilder
         .setFenodes(fe)
         .setTableIdentifier(table_name)
         .setUsername(username)
         .setPassword(password)
   
       // json format to streamload
       val properties = new Properties()
       properties.setProperty("format", "json")
       properties.setProperty("read_json_by_line", "true")
       val executionBuilder = DorisExecutionOptions.builder
       executionBuilder
         .setLabelPrefix(bootstrap+"_"+UUID.randomUUID())
         .setCheckInterval(2)
         .setBufferCount(32)
         .setMaxRetries(4)
         .setStreamLoadProp(properties) //streamload label prefix
   
   
       val fieldsInfo:Array[(Int,String,DataType)] = Array(
         (0,"p_date",DataTypes.DATE),
         (1,"data_code",DataTypes.VARCHAR(32)),
         (2,"sensor_code",DataTypes.VARCHAR(128)),
         (3,"post_time",DataTypes.TIMESTAMP),
         (4,"test_col_1",DataTypes.MAP(DataTypes.TIMESTAMP,DataTypes.INT())),
         (5,"test_col_1",DataTypes.MAP(DataTypes.VARCHAR(32),DataTypes.INT())),
         (6,"__DORIS_DELETE_SIGN__",DataTypes.INT)
       )
   
       val fields=fieldsInfo.map(_._2)
       val types=fieldsInfo.map(_._3)
   
   
   
       
       logInfoStream.sinkTo(    builder
         .setDorisReadOptions(DorisReadOptions.builder.build)
         .setDorisExecutionOptions(executionBuilder.build)
         .setSerializer(
           RowDataSerializer
             .builder
             .setFieldNames(fields)
             .setType("json").setFieldType(types).build)
         .setDorisOptions(dorisBuilder.build)
         .build()
   
       ).name("log-to-doris")
   
       env.execute("log-sink")
   
     }
   }
   
   topic的数据demo
   
{"data":{"wsLat":30.918466},"dataCode":"FOH8U310XA4EORK","userCode":"user_gis","sensorId":"dvc_20220712_1633","postTime":"2023-05-01
 00:00:00","commissionCode":"WB510100000001"}
   
   ### Anything Else?
   
   没有了
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to