This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 0acfbf2d [fix] Fix garbled of table or column comments contain Chinese characters(#401) (#403) 0acfbf2d is described below commit 0acfbf2d1fb76208178bef391f65ede02a1380a5 Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Tue Jun 18 10:28:00 2024 +0800 [fix] Fix garbled of table or column comments contain Chinese characters(#401) (#403) --- .../flink/sink/schema/SchemaChangeManager.java | 16 ++++-- .../flink/sink/schema/SchemaManagerITCase.java | 57 ++++++++++++++++++++++ .../doris/flink/sink/schema/SchemaManagerTest.java | 6 +++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 2aca3c7b..d2bacf26 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -56,11 +56,17 @@ public class SchemaChangeManager implements Serializable { private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; private ObjectMapper objectMapper = new ObjectMapper(); private DorisOptions dorisOptions; + private String charsetEncoding = "UTF-8"; public SchemaChangeManager(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; } + public SchemaChangeManager(DorisOptions dorisOptions, String charsetEncoding) { + this.dorisOptions = dorisOptions; + this.charsetEncoding = charsetEncoding; + } + public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException { String createTableDDL = DorisSystem.buildCreateTableDDL(table); return execute(createTableDDL, table.getDatabase()); @@ -133,7 +139,8 @@ public class SchemaChangeManager implements Serializable { table); HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); - httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params))); + httpGet.setEntity( + new StringEntity(objectMapper.writeValueAsString(params), charsetEncoding)); String responseEntity = ""; Map<String, Object> responseMap = handleResponse(httpGet, responseEntity); return handleSchemaChange(responseMap, responseEntity); @@ -173,8 +180,11 @@ public class SchemaChangeManager implements Serializable { database); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); - httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); - httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + httpPost.setHeader( + HttpHeaders.CONTENT_TYPE, + String.format("application/json;charset=%s", charsetEncoding)); + httpPost.setEntity( + new StringEntity(objectMapper.writeValueAsString(param), charsetEncoding)); return httpPost; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java index 053cf65c..8d2a9b0d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java @@ -28,8 +28,11 @@ import org.junit.Test; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; public class SchemaManagerITCase extends DorisTestBase { @@ -82,6 +85,60 @@ public class SchemaManagerITCase extends DorisTestBase { Assert.assertTrue(exists); } + @Test + public void testAddColumnWithChineseComment() + throws SQLException, IOException, IllegalArgumentException { + String addColumnTbls = "add_column"; + initDorisSchemaChangeTable(addColumnTbls); + + // add a column by UTF-8 encoding + String addColumnName = "col_with_comment1"; + String chineseComment = "中文注释1"; + addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName, chineseComment, true); + + // change charset encoding to US-ASCII would cause garbled of Chinese. + schemaChangeManager = new SchemaChangeManager(options, "US-ASCII"); + addColumnName = "col_with_comment2"; + chineseComment = "中文注释2"; + addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName, chineseComment, false); + } + + private void addColumnWithChineseCommentAndAssert( + String tableName, String addColumnName, String chineseComment, boolean assertFlag) + throws SQLException, IOException, IllegalArgumentException { + FieldSchema field = new FieldSchema(addColumnName, "string", chineseComment); + schemaChangeManager.addColumn(DATABASE, tableName, field); + boolean exists = schemaChangeManager.addColumn(DATABASE, tableName, field); + Assert.assertTrue(exists); + + exists = schemaChangeManager.checkColumnExists(DATABASE, tableName, addColumnName); + Assert.assertTrue(exists); + + // check Chinese comment + Map<String, String> columnComments = getColumnComments(tableName); + if (assertFlag) { + Assert.assertEquals(columnComments.get(addColumnName), chineseComment); + } else { + Assert.assertNotEquals(columnComments.get(addColumnName), chineseComment); + } + } + + private Map<String, String> getColumnComments(String table) throws SQLException { + Map<String, String> columnCommentsMap = new HashMap<>(); + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD)) { + ResultSet columns = connection.getMetaData().getColumns(null, DATABASE, table, null); + + while (columns.next()) { + String columnName = columns.getString("COLUMN_NAME"); + String comment = columns.getString("REMARKS"); + columnCommentsMap.put(columnName, comment); + } + } + return columnCommentsMap; + } + @Test public void testDropColumn() throws SQLException, IOException, IllegalArgumentException { String dropColumnTbls = "drop_column"; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java index 529cc860..16c901e5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java @@ -132,6 +132,12 @@ public class SchemaManagerTest { Assert.assertEquals( "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT current_timestamp COMMENT 'comment \"\\'sdf\\''", addColumnDDL); + + field = new FieldSchema("col", "int", "current_timestamp", "中文注释"); + addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field); + Assert.assertEquals( + "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT current_timestamp COMMENT '中文注释'", + addColumnDDL); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org