imx7 opened a new issue, #21832: URL: https://github.com/apache/doris/issues/21832
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Version Apache Doris 2.0-beta (Latest) "org.apache.doris" % "flink-doris-connector-1.17" % "1.4.0" Apache Flink 1.17.1 ### What's Wrong? flink通过streamload的方式写入MAP<K,V>类型的数据,当K为Timestamp的类型时会写入失败。 insert into方式正常。 alpha版本正常。 ### What You Expected? 修复此BUG ### How to Reproduce? import com.fasterxml.jackson.databind.ObjectMapper import org.apache.doris.flink.cfg.{DorisExecutionOptions, DorisOptions, DorisReadOptions} import org.apache.doris.flink.sink.DorisSink import org.apache.doris.flink.sink.writer.RowDataSerializer import org.apache.flink.api.common.accumulators.LongCounter import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration._ import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.data.{GenericMapData, GenericRowData, RowData, StringData, TimestampData} import org.apache.flink.table.types.DataType import org.apache.flink.util.Collector import java.text.SimpleDateFormat import java.time.LocalDate import java.time.format.DateTimeFormatter import java.util.{Locale, Properties, UUID} import java.util.regex.Pattern import scala.util.Try import scala.collection.JavaConverters._ import java.util object StreamLoadApp { def main(args: Array[String]): Unit = { /** * 参数配置 */ /**kafka*/ val params = ParameterTool.fromArgs(args) val bootstrap=params.get("bootstrap") val groupId=params.get("group_id") val topic=params.get("topic") val offset=params.get("offset").toLowerCase match { case "earliest"=>OffsetsInitializer.earliest() case _ =>OffsetsInitializer.latest() } /**doris**/ val fe=params.get("fe") val username=params.get("username") val password=params.get("password") val table_name=params.get("table_name") /** * 环境初始化 * 设置log */ val config = new org.apache.flink.configuration.Configuration() config.setString(RestOptions.BIND_PORT, "6062-6092") val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) // val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(8000) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(999999, 32*1000L)) val logSource =KafkaSource .builder[String]() .setBootstrapServers(bootstrap) .setGroupId(groupId) .setTopicPattern(Pattern.compile(topic)) .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(offset) .setProperty("partition.discovery.interval.ms", "10000") .setProperty("metadata.max.age.ms", "1000") .setProperty("max,request.size", "10000000") .setProperty("session.timeout.ms","600000") .setProperty("request.timeout.ms","600000") .setProperty("enable.auto.commit","true")//add at 2023-0419 .setProperty("auto.commit.interval.ms","512000")//add at 2023-0419 .build() val kafkaStream = env.fromSource(logSource,WatermarkStrategy.noWatermarks(),"KafkaSourceLog") val logInfoStream = kafkaStream.process( new ProcessFunction[String,RowData]() { val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.CHINA) val logCount = new LongCounter() override def open(parameters: Configuration): Unit = { super.open(parameters) getRuntimeContext.addAccumulator("log-count",logCount) } override def processElement(value: String, ctx: ProcessFunction[String, RowData]#Context, out: Collector[RowData]): Unit = { val mapper = new ObjectMapper() try { val root = mapper.readTree(value) val dataCode = root.path("dataCode").asText val sensorCode = Try{ root.path("sensorId").asText }.getOrElse(root.path("deviceId").asText) val postTime = root.path("postTime").asText val p_date= LocalDate.parse(postTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toEpochDay.toInt val data_code=StringData.fromString(dataCode) val sensor_code=StringData.fromString(sensorCode) val post_time_tmp=new java.sql.Timestamp(dateFormat.parse(postTime).getTime) val post_time=TimestampData.fromTimestamp(post_time_tmp) // val test_col_1=Map(post_time_tmp.asInstanceOf[AnyRef] ->value.length.asInstanceOf[AnyRef]) val test_col_1=Map(post_time_tmp->value.length) val test_col_2=Map(post_time_tmp.getTime.toString ->value.length) val genericRowData = new GenericRowData(7) genericRowData.setField(0,p_date) genericRowData.setField(1,data_code) genericRowData.setField(2,sensor_code) genericRowData.setField(3,post_time) /***下面这一行执行失败*/ genericRowData.setField(4,new GenericMapData(test_col_1.asJava)) genericRowData.setField(5,new GenericMapData(test_col_2.asJava)) genericRowData.setField(6,0) out.collect(genericRowData) /** 计数加1 * */ logCount.add(1) } catch{ case e: Exception => println(s"exception-time: ${dateFormat.format(System.currentTimeMillis())}\n") println(value) e.printStackTrace() } } } ).name("log-process") val builder = DorisSink.builder[RowData] val dorisBuilder = DorisOptions.builder dorisBuilder .setFenodes(fe) .setTableIdentifier(table_name) .setUsername(username) .setPassword(password) // json format to streamload val properties = new Properties() properties.setProperty("format", "json") properties.setProperty("read_json_by_line", "true") val executionBuilder = DorisExecutionOptions.builder executionBuilder .setLabelPrefix(bootstrap+"_"+UUID.randomUUID()) .setCheckInterval(2) .setBufferCount(32) .setMaxRetries(4) .setStreamLoadProp(properties) //streamload label prefix val fieldsInfo:Array[(Int,String,DataType)] = Array( (0,"p_date",DataTypes.DATE), (1,"data_code",DataTypes.VARCHAR(32)), (2,"sensor_code",DataTypes.VARCHAR(128)), (3,"post_time",DataTypes.TIMESTAMP), (4,"test_col_1",DataTypes.MAP(DataTypes.TIMESTAMP,DataTypes.INT())), (5,"test_col_1",DataTypes.MAP(DataTypes.VARCHAR(32),DataTypes.INT())), (6,"__DORIS_DELETE_SIGN__",DataTypes.INT) ) val fields=fieldsInfo.map(_._2) val types=fieldsInfo.map(_._3) logInfoStream.sinkTo( builder .setDorisReadOptions(DorisReadOptions.builder.build) .setDorisExecutionOptions(executionBuilder.build) .setSerializer( RowDataSerializer .builder .setFieldNames(fields) .setType("json").setFieldType(types).build) .setDorisOptions(dorisBuilder.build) .build() ).name("log-to-doris") env.execute("log-sink") } } topic的数据demo {"data":{"wsLat":30.918466},"dataCode":"FOH8U310XA4EORK","userCode":"user_gis","sensorId":"dvc_20220712_1633","postTime":"2023-05-01 00:00:00","commissionCode":"WB510100000001"} ### Anything Else? 没有了 ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org