benxiaohai061 opened a new issue, #617:
URL: https://github.com/apache/doris-flink-connector/issues/617

   我现在想实现一个实时同步mysql数据到doris的方案(整库同步)。但由于网络隔离原因。 mysql和doris不在同一个网络。我使用了如下方案。
   1、通过flink-cdc 实时把mysql的数据生成Debezium JSON格式文件,
   2、然后再通过FTP传入内网。 
   3、再使用flink程序实时读取内网接收的文件入doris。 
   
   但现在出现的问题是。 
   1、doris的表必须存在。 不能自动建表。 
   2、标准Debezium JSON的格式,doris sink执行会报错。
   15:51:34,285 WARN  
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange
 [] - Failed to extract tableChanges. 
record={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional"
 
:false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.
 
Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":
 
true,"field":"transaction"}],"optional":false,"name":"mysql_binlog_source.test.students.Envelope"},"payload":{"before":null,"after":{"student_id":5,"first_name":"Charlie","last_name":"Davis","gender":"Other","date_of_birth":"2002-07-30","email":"[email protected]","phone_number":null,"address":null,"enrollment_date":"2023-09-01","is_active":0},"source":{"version":"1.9.8.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"students","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1760686786704,"transaction":null}}
   15:51:34,286 WARN  
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2
 [] - Failed to parse eventType. 
recordRoot={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","opti
 
onal":false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.
 
data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optio
 
nal":true,"field":"transaction"}],"optional":false,"name":"mysql_binlog_source.test.students.Envelope"},"payload":{"before":null,"after":{"student_id":5,"first_name":"Charlie","last_name":"Davis","gender":"Other","date_of_birth":"2002-07-30","email":"[email protected]","phone_number":null,"address":null,"enrollment_date":"2023-09-01","is_active":0},"source":{"version":"1.9.8.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"students","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1760686786704,"transaction":null}}
   15:51:34,286 W
   
   
   
   相关代码是:
   1、实时生成文件
   Map<String, Object> customConverterConfigs = new HashMap<>();
           
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
           JsonDebeziumDeserializationSchema schema =
                   new JsonDebeziumDeserializationSchema(true, 
customConverterConfigs);
   
           MySqlSource<String> mySqlSource =
                   MySqlSource.<String>builder()
                           .hostname("127.0.0.1")
                           .port(23306)
                           .databaseList("test") // set captured database
                           .tableList("test\\..*")// set captured table
                           .username("root")
                           .password("123456")
                           
.debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                           .deserializer(schema)
                           .serverTimeZone("Asia/Shanghai")
                           .includeSchemaChanges(true) // converts SourceRecord 
to JSON String
                           .build();
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
           // 设置 3s 的 checkpoint 间隔
           env.enableCheckpointing(3000);
   
           DataStream<String> cdcStream = env
                   .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), 
"MySQL Source")
                   .setParallelism(2);
   
           // 3. 文件系统 Sink
           FileSink<String> fileSink = FileSink
                   .forRowFormat(
                           new Path("D:/data/mysql_cdc/"),          // 输出路径
                           new SimpleStringEncoder<String>("UTF-8"))
                   .withRollingPolicy(
                           DefaultRollingPolicy.builder()
                                   .withRolloverInterval(Duration.ofMinutes(1)) 
  // 每分钟滚动
                                   
.withInactivityInterval(Duration.ofSeconds(30)) // 空闲超过30秒滚动
                                   .withMaxPartSize(1024 * 1024 * 128)          
  // 128MB
                                   .build()
                   )
                   .build();
   
           // 4. 写入文件
           cdcStream.sinkTo(fileSink).setParallelism(1);
   
           env.execute("MySQL CDC to File");
   
   2、写入doris
   public static void main(String[] args) throws Exception {
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
           // 开启 checkpoint
           env.enableCheckpointing(10000);
   
           // ----------------------
           // 1. FileSource 读取 MySQL CDC JSON 文件
           // ----------------------
           FileSource<String> fileSource = FileSource
                   .forRecordStreamFormat(new TextLineInputFormat(), new 
Path("D:/data/mysql_cdc/"))
                   .monitorContinuously(Duration.ofSeconds(5)) // 每5秒扫描一次新文件
                   .build();
   
           DataStream<String> fileStream = env.fromSource(
                   fileSource,
                   WatermarkStrategy.noWatermarks(),
                   "File Source"
           );
   
           // ----------------------
           // 2. Doris Sink 配置
           // ----------------------
           Properties props = new Properties();
           props.setProperty("format", "json");
           props.setProperty("read_json_by_line", "true");
   
           DorisOptions dorisOptions = DorisOptions.builder()
                   .setFenodes("127.0.0.1:8030")
                   .setTableIdentifier("")
                   .setUsername("root")
                   .setPassword("123456")
                   .build();
   
           DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
           executionBuilder
                   .setLabelPrefix("label-doris" + UUID.randomUUID())
                   .setStreamLoadProp(props)
                   .setDeletable(true);
   
           DorisSink.Builder<String> builder = DorisSink.builder();
           builder.setDorisReadOptions(DorisReadOptions.builder().build())
                   .setDorisExecutionOptions(executionBuilder.build())
                   .setDorisOptions(dorisOptions)
                   .setSerializer(
                           JsonDebeziumSchemaSerializer.builder()
                                   .setDorisOptions(dorisOptions)
                                   .setNewSchemaChange(true)
                                   .build());
   
           // ----------------------
           // 3. 写入 Doris
           // ----------------------
           fileStream/*.map(new 
DebeziumToDorisJsonMapper())*/.sinkTo(builder.build());
   
           env.execute("File to Doris");
       }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to