This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 323b872  [Improve] schema change idempotent with add and drop column 
(#250)
323b872 is described below

commit 323b872385505f2d718d5c817fb294c2021b2deb
Author: wudi <676366...@qq.com>
AuthorDate: Thu Nov 30 15:09:13 2023 +0800

    [Improve] schema change idempotent with add and drop column (#250)
---
 .../doris/flink/catalog/doris/DorisSystem.java     |  20 +++-
 .../exception/DorisSchemaChangeException.java      |  45 +++++++++
 .../flink/sink/schema/SchemaChangeHelper.java      |   5 +
 .../flink/sink/schema/SchemaChangeManager.java     |  57 +++++++++--
 .../serializer/JsonDebeziumSchemaSerializer.java   |   2 +-
 .../doris/flink/sink/schema/SchemaManagerTest.java | 107 +++++++++++++++++++++
 6 files changed, 225 insertions(+), 11 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 1f0a09f..7be9a5f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Statement;
@@ -45,7 +46,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * Doris System Operate
  */
 @Public
-public class DorisSystem {
+public class DorisSystem implements Serializable {
+    private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSystem.class);
     private final JdbcConnectionProvider jdbcConnectionProvider;
     private static final List<String> builtinDatabases = 
Collections.singletonList("information_schema");
@@ -81,6 +83,22 @@ public class DorisSystem {
                 && listTables(database).contains(table);
     }
 
+    public boolean columnExists(String database, String table, String 
columnName){
+        if(tableExists(database, table)){
+            List<String> columns = extractColumnValuesBySQL(
+                    "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` 
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
+                    1,
+                    null,
+                    database,
+                    table,
+                    columnName);
+            if(columns != null && !columns.isEmpty()){
+               return true;
+            }
+        }
+        return false;
+    }
+
     public List<String> listTables(String databaseName) {
         if (!databaseExists(databaseName)) {
             throw new DorisRuntimeException("database" + databaseName + " is 
not exists");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java
new file mode 100644
index 0000000..52c67b1
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.exception;
+
+/**
+ * Doris Schema Change run exception.
+ */
+public class DorisSchemaChangeException extends RuntimeException {
+    public DorisSchemaChangeException() {
+        super();
+    }
+
+    public DorisSchemaChangeException(String message) {
+        super(message);
+    }
+
+    public DorisSchemaChangeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DorisSchemaChangeException(Throwable cause) {
+        super(cause);
+    }
+
+    protected DorisSchemaChangeException(String message, Throwable cause,
+                                         boolean enableSuppression,
+                                         boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index c7a3384..6f1f2a9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -35,6 +35,7 @@ public class SchemaChangeHelper {
     private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
     private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
     private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s 
%s";
+    private static final String CHECK_COLUMN_EXISTS = "SELECT COLUMN_NAME FROM 
information_schema.`COLUMNS` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' 
AND COLUMN_NAME = '%s'";
 
     public static void compareSchema(Map<String, FieldSchema> 
updateFiledSchemaMap,
             Map<String, FieldSchema> originFieldSchemaMap) {
@@ -115,6 +116,10 @@ public class SchemaChangeHelper {
         return String.format(RENAME_DDL, tableIdentifier, oldColumnName, 
newColumnName);
     }
 
+    public static String buildColumnExistsQuery(String database, String table, 
String column){
+        return String.format(CHECK_COLUMN_EXISTS, database, table, column);
+    }
+
     public static List<DDLSchema> getDdlSchemas() {
         return ddlSchemas;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index 296fb2f..569d440 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -17,12 +17,14 @@
 
 package org.apache.doris.flink.sink.schema;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.doris.flink.catalog.doris.DorisSystem;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.DorisSchemaChangeException;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
@@ -63,12 +65,20 @@ public class SchemaChangeManager implements Serializable {
     }
 
     public boolean addColumn(String database, String table, FieldSchema field) 
throws IOException, IllegalArgumentException {
+        if(checkColumnExists(database, table, field.getName())){
+            LOG.warn("The column {} already exists in table {}, no need to add 
it again", field.getName(), table);
+            return true;
+        }
         String tableIdentifier = getTableIdentifier(database, table);
         String addColumnDDL = 
SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);
         return schemaChange(database, table, buildRequestParam(false, 
field.getName()), addColumnDDL);
     }
 
     public boolean dropColumn(String database, String table, String 
columnName) throws IOException, IllegalArgumentException {
+        if(!checkColumnExists(database, table, columnName)){
+            LOG.warn("The column {} not exists in table {}, no need to drop", 
columnName, table);
+            return true;
+        }
         String tableIdentifier = getTableIdentifier(database, table);
         String dropColumnDDL = 
SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName);
         return schemaChange(database, table, buildRequestParam(true, 
columnName), dropColumnDDL);
@@ -106,11 +116,7 @@ public class SchemaChangeManager implements Serializable {
         HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
         httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
         httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(params)));
-        boolean success = handleResponse(httpGet);
-        if (!success) {
-            LOG.warn("schema change can not do table {}.{}", database, table);
-        }
-        return success;
+        return handleResponse(httpGet);
     }
 
     /**
@@ -121,6 +127,11 @@ public class SchemaChangeManager implements Serializable {
             return false;
         }
         LOG.info("Execute SQL: {}", ddl);
+        HttpPost httpPost = buildHttpPost(ddl, database);
+        return handleResponse(httpPost);
+    }
+
+    public HttpPost buildHttpPost(String ddl, String database) throws 
IllegalArgumentException, IOException {
         Map<String, String> param = new HashMap<>();
         param.put("stmt", ddl);
         String requestUrl = String.format(SCHEMA_CHANGE_API,
@@ -129,14 +140,14 @@ public class SchemaChangeManager implements Serializable {
         httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
         httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
         httpPost.setEntity(new 
StringEntity(objectMapper.writeValueAsString(param)));
-        boolean success = handleResponse(httpPost);
-        return success;
+        return httpPost;
     }
 
     private boolean handleResponse(HttpUriRequest request) {
         try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
             CloseableHttpResponse response = httpclient.execute(request);
             final int statusCode = response.getStatusLine().getStatusCode();
+            final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
             if (statusCode == 200 && response.getEntity() != null) {
                 String loadResult = EntityUtils.toString(response.getEntity());
                 Map<String, Object> responseMap = 
objectMapper.readValue(loadResult, Map.class);
@@ -144,11 +155,39 @@ public class SchemaChangeManager implements Serializable {
                 if (code.equals("0")) {
                     return true;
                 } else {
-                    LOG.error("schema change response:{}", loadResult);
+                    throw new DorisSchemaChangeException("Failed to 
schemaChange, response: " + loadResult);
+                }
+            } else{
+                throw new DorisSchemaChangeException("Failed to schemaChange, 
status: " + statusCode + ", reason: " + reasonPhrase);
+            }
+        } catch (Exception e) {
+            LOG.error("SchemaChange request error,", e);
+            throw new DorisSchemaChangeException("SchemaChange request error 
with " + e.getMessage());
+        }
+    }
+
+    /**
+     * When processing a column, determine whether it exists and be idempotent.
+     */
+    public boolean checkColumnExists(String database, String table, String 
columnName) throws IllegalArgumentException, IOException {
+        String existsQuery = 
SchemaChangeHelper.buildColumnExistsQuery(database, table, columnName);
+        HttpPost httpPost = buildHttpPost(existsQuery, database);
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            CloseableHttpResponse response = httpclient.execute(httpPost);
+            final int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == 200 && response.getEntity() != null) {
+                String loadResult = EntityUtils.toString(response.getEntity());
+                JsonNode responseNode = objectMapper.readTree(loadResult);
+                String code = responseNode.get("code").asText("-1");
+                if (code.equals("0")) {
+                    JsonNode data = responseNode.get("data").get("data");
+                    if(!data.isEmpty()){
+                        return true;
+                    }
                 }
             }
         } catch (Exception e) {
-            LOG.error("http request error,", e);
+            LOG.error("check column exist request error {}, default return 
false", e.getMessage());
         }
         return false;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index d87da4c..0c4309b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -308,7 +308,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 return false;
             }
 
-            boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0, 
tuple.f1);
+            boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, 
ddl);
             status = doSchemaChange && schemaChangeManager.execute(ddl, 
tuple.f0);
             LOG.info("schema change status:{}", status);
         } catch (Exception ex) {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
new file mode 100644
index 0000000..ec80e92
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.HttpEntityMock;
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicStatusLine;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+public class SchemaManagerTest {
+
+    static String QUERY_RESPONSE = "{\n" +
+            "    \"data\": {\n" +
+            "        \"type\": \"result_set\",\n" +
+            "        \"meta\": 
[{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" +
+            "        \"data\": [\n" +
+            "            [\"age\"]\n" +
+            "        ],\n" +
+            "        \"time\": 15\n" +
+            "    },\n" +
+            "    \"msg\": \"success\",\n" +
+            "    \"code\": 0\n" +
+            "}";
+
+    static String QUERY_NO_EXISTS_RESPONSE = "{\n" +
+            "    \"data\": {\n" +
+            "        \"type\": \"result_set\",\n" +
+            "        \"meta\": 
[{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" +
+            "        \"data\": [],\n" +
+            "        \"time\": 0\n" +
+            "    },\n" +
+            "    \"msg\": \"success\",\n" +
+            "    \"code\": 0\n" +
+            "}";
+
+    HttpEntityMock entityMock;
+    SchemaChangeManager schemaChangeManager;
+    static  MockedStatic<HttpClients> httpClientMockedStatic = 
mockStatic(HttpClients.class);
+
+
+    @Before
+    public void setUp() throws IOException {
+        DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
+        schemaChangeManager = new SchemaChangeManager(dorisOptions);
+        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+        entityMock = new HttpEntityMock();
+
+        CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
+        StatusLine normalLine = new BasicStatusLine(new 
ProtocolVersion("http", 1, 0), 200, "");
+
+        when(httpClient.execute(any())).thenReturn(httpResponse);
+        when(httpResponse.getStatusLine()).thenReturn(normalLine);
+        when(httpResponse.getEntity()).thenReturn(entityMock);
+        when(httpClient.execute(any())).thenReturn(httpResponse);
+        when(httpResponse.getStatusLine()).thenReturn(normalLine);
+        when(httpResponse.getEntity()).thenReturn(entityMock);
+
+        httpClientMockedStatic.when(()-> HttpClients.createDefault())
+                .thenReturn(httpClient);
+    }
+
+    @Test
+    public void testColumnExists() throws IOException, 
IllegalArgumentException {
+        entityMock.setValue(QUERY_RESPONSE);
+        boolean columnExists = schemaChangeManager.checkColumnExists("test", 
"test_flink", "age");
+        System.out.println(columnExists);
+    }
+
+    @Test
+    public void testColumnNotExists() throws IOException, 
IllegalArgumentException {
+        entityMock.setValue(QUERY_NO_EXISTS_RESPONSE);
+        boolean columnExists = schemaChangeManager.checkColumnExists("test", 
"test_flink", "age1");
+        System.out.println(columnExists);
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to