This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 323b872 [Improve] schema change idempotent with add and drop column (#250) 323b872 is described below commit 323b872385505f2d718d5c817fb294c2021b2deb Author: wudi <676366...@qq.com> AuthorDate: Thu Nov 30 15:09:13 2023 +0800 [Improve] schema change idempotent with add and drop column (#250) --- .../doris/flink/catalog/doris/DorisSystem.java | 20 +++- .../exception/DorisSchemaChangeException.java | 45 +++++++++ .../flink/sink/schema/SchemaChangeHelper.java | 5 + .../flink/sink/schema/SchemaChangeManager.java | 57 +++++++++-- .../serializer/JsonDebeziumSchemaSerializer.java | 2 +- .../doris/flink/sink/schema/SchemaManagerTest.java | 107 +++++++++++++++++++++ 6 files changed, 225 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 1f0a09f..7be9a5f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -29,6 +29,7 @@ import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; @@ -45,7 +46,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; * Doris System Operate */ @Public -public class DorisSystem { +public class DorisSystem implements Serializable { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DorisSystem.class); private final JdbcConnectionProvider jdbcConnectionProvider; private static final List<String> builtinDatabases = Collections.singletonList("information_schema"); @@ -81,6 +83,22 @@ public class DorisSystem { && listTables(database).contains(table); } + public boolean columnExists(String database, String table, String columnName){ + if(tableExists(database, table)){ + List<String> columns = extractColumnValuesBySQL( + "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?", + 1, + null, + database, + table, + columnName); + if(columns != null && !columns.isEmpty()){ + return true; + } + } + return false; + } + public List<String> listTables(String databaseName) { if (!databaseExists(databaseName)) { throw new DorisRuntimeException("database" + databaseName + " is not exists"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java new file mode 100644 index 0000000..52c67b1 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.exception; + +/** + * Doris Schema Change run exception. + */ +public class DorisSchemaChangeException extends RuntimeException { + public DorisSchemaChangeException() { + super(); + } + + public DorisSchemaChangeException(String message) { + super(message); + } + + public DorisSchemaChangeException(String message, Throwable cause) { + super(message, cause); + } + + public DorisSchemaChangeException(Throwable cause) { + super(cause); + } + + protected DorisSchemaChangeException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index c7a3384..6f1f2a9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -35,6 +35,7 @@ public class SchemaChangeHelper { private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s"; + private static final String CHECK_COLUMN_EXISTS = "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'"; public static void compareSchema(Map<String, FieldSchema> updateFiledSchemaMap, Map<String, FieldSchema> originFieldSchemaMap) { @@ -115,6 +116,10 @@ public class SchemaChangeHelper { return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName); } + public static String buildColumnExistsQuery(String database, String table, String column){ + return String.format(CHECK_COLUMN_EXISTS, database, table, column); + } + public static List<DDLSchema> getDdlSchemas() { return ddlSchemas; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 296fb2f..569d440 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -17,12 +17,14 @@ package org.apache.doris.flink.sink.schema; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Base64; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.DorisSchemaChangeException; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.HttpGetWithEntity; @@ -63,12 +65,20 @@ public class SchemaChangeManager implements Serializable { } public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException { + if(checkColumnExists(database, table, field.getName())){ + LOG.warn("The column {} already exists in table {}, no need to add it again", field.getName(), table); + return true; + } String tableIdentifier = getTableIdentifier(database, table); String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field); return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL); } public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException { + if(!checkColumnExists(database, table, columnName)){ + LOG.warn("The column {} not exists in table {}, no need to drop", columnName, table); + return true; + } String tableIdentifier = getTableIdentifier(database, table); String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName); return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL); @@ -106,11 +116,7 @@ public class SchemaChangeManager implements Serializable { HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params))); - boolean success = handleResponse(httpGet); - if (!success) { - LOG.warn("schema change can not do table {}.{}", database, table); - } - return success; + return handleResponse(httpGet); } /** @@ -121,6 +127,11 @@ public class SchemaChangeManager implements Serializable { return false; } LOG.info("Execute SQL: {}", ddl); + HttpPost httpPost = buildHttpPost(ddl, database); + return handleResponse(httpPost); + } + + public HttpPost buildHttpPost(String ddl, String database) throws IllegalArgumentException, IOException { Map<String, String> param = new HashMap<>(); param.put("stmt", ddl); String requestUrl = String.format(SCHEMA_CHANGE_API, @@ -129,14 +140,14 @@ public class SchemaChangeManager implements Serializable { httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); - boolean success = handleResponse(httpPost); - return success; + return httpPost; } private boolean handleResponse(HttpUriRequest request) { try (CloseableHttpClient httpclient = HttpClients.createDefault()) { CloseableHttpResponse response = httpclient.execute(request); final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); if (statusCode == 200 && response.getEntity() != null) { String loadResult = EntityUtils.toString(response.getEntity()); Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class); @@ -144,11 +155,39 @@ public class SchemaChangeManager implements Serializable { if (code.equals("0")) { return true; } else { - LOG.error("schema change response:{}", loadResult); + throw new DorisSchemaChangeException("Failed to schemaChange, response: " + loadResult); + } + } else{ + throw new DorisSchemaChangeException("Failed to schemaChange, status: " + statusCode + ", reason: " + reasonPhrase); + } + } catch (Exception e) { + LOG.error("SchemaChange request error,", e); + throw new DorisSchemaChangeException("SchemaChange request error with " + e.getMessage()); + } + } + + /** + * When processing a column, determine whether it exists and be idempotent. + */ + public boolean checkColumnExists(String database, String table, String columnName) throws IllegalArgumentException, IOException { + String existsQuery = SchemaChangeHelper.buildColumnExistsQuery(database, table, columnName); + HttpPost httpPost = buildHttpPost(existsQuery, database); + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(httpPost); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + JsonNode responseNode = objectMapper.readTree(loadResult); + String code = responseNode.get("code").asText("-1"); + if (code.equals("0")) { + JsonNode data = responseNode.get("data").get("data"); + if(!data.isEmpty()){ + return true; + } } } } catch (Exception e) { - LOG.error("http request error,", e); + LOG.error("check column exist request error {}, default return false", e.getMessage()); } return false; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index d87da4c..0c4309b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -308,7 +308,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return false; } - boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0, tuple.f1); + boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl); status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0); LOG.info("schema change status:{}", status); } catch (Exception ex) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java new file mode 100644 index 0000000..ec80e92 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.HttpEntityMock; +import org.apache.doris.flink.sink.OptionUtils; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicStatusLine; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +public class SchemaManagerTest { + + static String QUERY_RESPONSE = "{\n" + + " \"data\": {\n" + + " \"type\": \"result_set\",\n" + + " \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" + + " \"data\": [\n" + + " [\"age\"]\n" + + " ],\n" + + " \"time\": 15\n" + + " },\n" + + " \"msg\": \"success\",\n" + + " \"code\": 0\n" + + "}"; + + static String QUERY_NO_EXISTS_RESPONSE = "{\n" + + " \"data\": {\n" + + " \"type\": \"result_set\",\n" + + " \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" + + " \"data\": [],\n" + + " \"time\": 0\n" + + " },\n" + + " \"msg\": \"success\",\n" + + " \"code\": 0\n" + + "}"; + + HttpEntityMock entityMock; + SchemaChangeManager schemaChangeManager; + static MockedStatic<HttpClients> httpClientMockedStatic = mockStatic(HttpClients.class); + + + @Before + public void setUp() throws IOException { + DorisOptions dorisOptions = OptionUtils.buildDorisOptions(); + schemaChangeManager = new SchemaChangeManager(dorisOptions); + CloseableHttpClient httpClient = mock(CloseableHttpClient.class); + entityMock = new HttpEntityMock(); + + CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class); + StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, ""); + + when(httpClient.execute(any())).thenReturn(httpResponse); + when(httpResponse.getStatusLine()).thenReturn(normalLine); + when(httpResponse.getEntity()).thenReturn(entityMock); + when(httpClient.execute(any())).thenReturn(httpResponse); + when(httpResponse.getStatusLine()).thenReturn(normalLine); + when(httpResponse.getEntity()).thenReturn(entityMock); + + httpClientMockedStatic.when(()-> HttpClients.createDefault()) + .thenReturn(httpClient); + } + + @Test + public void testColumnExists() throws IOException, IllegalArgumentException { + entityMock.setValue(QUERY_RESPONSE); + boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age"); + System.out.println(columnExists); + } + + @Test + public void testColumnNotExists() throws IOException, IllegalArgumentException { + entityMock.setValue(QUERY_NO_EXISTS_RESPONSE); + boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1"); + System.out.println(columnExists); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org