JNSimba commented on code in PR #279:
URL: 
https://github.com/apache/doris-flink-connector/pull/279#discussion_r1436699500


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
+
+/**
+ * Convert the data change record of the upstream data source into a byte 
array that can be imported
+ * into doris through stream load.<br>
+ * Supported data changes include: read, insert, update, delete.
+ */
+public class JsonDebeziumDataChange implements Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JsonDebeziumDataChange.class);
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+    private final ObjectMapper objectMapper;
+    private final DorisOptions dorisOptions;
+    private Map<String, String> tableMapping;
+    private final boolean ignoreUpdateBefore;
+    private final String lineDelimiter;
+
+    public JsonDebeziumDataChange(
+            DorisOptions dorisOptions,
+            Map<String, String> tableMapping,
+            boolean ignoreUpdateBefore,
+            String lineDelimiter,
+            ObjectMapper objectMapper) {
+        this.dorisOptions = dorisOptions;
+        this.objectMapper = objectMapper;
+        this.tableMapping = tableMapping;
+        this.ignoreUpdateBefore = ignoreUpdateBefore;
+        this.lineDelimiter = lineDelimiter;
+    }
+
+    public DorisRecord serialize(String record, JsonNode recordRoot, String 
op) throws IOException {
+        // Filter out table records that are not in tableMapping
+        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier = 
getDorisTableIdentifier(cdcTableIdentifier);
+        if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
+            LOG.warn(
+                    "filter table {}, because it is not listened, record 
detail is {}",
+                    cdcTableIdentifier,
+                    record);
+            return null;

Review Comment:
   When adding tables dynamically, will the `tableMapping` here not change? So 
the data of the newly added table cannot be captured?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to