This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new d556b6e2ee4 branch-4.0: [Fix](StreamingJob) fix create table issues 
when create streaming job #59828 (#59853)
d556b6e2ee4 is described below

commit d556b6e2ee4498668842df13d18ca07941202753
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 14 17:53:09 2026 +0800

    branch-4.0: [Fix](StreamingJob) fix create table issues when create 
streaming job #59828 (#59853)
    
    Cherry-picked from #59828
    
    Co-authored-by: wudi <[email protected]>
---
 .../doris/httpv2/rest/StreamingJobAction.java      |   3 +
 .../apache/doris/job/util/StreamingJobUtils.java   |  27 ++-
 .../doris/job/util/StreamingJobUtilsTest.java      | 240 +++++++++++++++++++++
 .../cdc/test_streaming_mysql_job_all_type.out      |   2 +-
 .../cdc/test_streaming_mysql_job.groovy            |   3 +-
 5 files changed, 272 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
index 53610142f12..573e0a17f16 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
@@ -28,6 +28,7 @@ import jakarta.servlet.http.HttpServletRequest;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
+import lombok.ToString;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -67,6 +68,7 @@ public class StreamingJobAction extends RestBaseController {
 
         StreamingInsertJob streamingJob = (StreamingInsertJob) job;
         try {
+            LOG.info("Committing offset with {}", offsetRequest.toString());
             streamingJob.commitOffset(offsetRequest);
             return ResponseEntityBuilder.ok("Offset committed successfully");
         } catch (Exception e) {
@@ -79,6 +81,7 @@ public class StreamingJobAction extends RestBaseController {
     @Getter
     @Setter
     @NoArgsConstructor
+    @ToString
     public static class CommitOffsetRequest {
         public long jobId;
         public long taskId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 4625417b67d..bac12ae3eba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -59,6 +59,7 @@ import org.apache.commons.text.StringSubstitutor;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -333,18 +334,42 @@ public class StreamingJobUtils {
         return createtblCmds;
     }
 
-    private static List<Column> getColumns(JdbcClient jdbcClient,
+    public static List<Column> getColumns(JdbcClient jdbcClient,
             String database,
             String table,
             List<String> primaryKeys) {
         List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
         columns.forEach(col -> {
+            Preconditions.checkArgument(!col.getType().isUnsupported(),
+                    "Unsupported column type, table:[%s], column:[%s]", table, 
col.getName());
+            if (col.getType().isVarchar()) {
+                // The length of varchar needs to be multiplied by 3.
+                int len = col.getType().getLength() * 3;
+                if (len > ScalarType.MAX_VARCHAR_LENGTH) {
+                    col.setType(ScalarType.createStringType());
+                } else {
+                    col.setType(ScalarType.createVarcharType(len));
+                }
+            }
+
             // string can not to be key
             if (primaryKeys.contains(col.getName())
                     && col.getDataType() == PrimitiveType.STRING) {
                 
col.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH));
             }
         });
+
+        // sort columns for primary keys
+        columns.sort(
+                Comparator
+                        .comparing((Column col) -> 
!primaryKeys.contains(col.getName()))
+                        .thenComparing(
+                                col -> primaryKeys.contains(col.getName())
+                                        ? primaryKeys.indexOf(col.getName())
+                                        : Integer.MAX_VALUE
+                        )
+        );
+
         return columns;
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java
new file mode 100644
index 00000000000..3e090d4d3fe
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+    @Mock
+    private JdbcClient jdbcClient;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testGetColumnsWithPrimaryKeySorting() throws Exception {
+        // Prepare test data
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id", "name");
+
+        // Create mock columns in random order
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("age", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("id", 
ScalarType.createType(PrimitiveType.BIGINT)));
+        mockColumns.add(new Column("email", 
ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+        mockColumns.add(new Column("address", 
ScalarType.createVarcharType(200)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify primary keys are at the front in correct order
+        Assert.assertEquals(5, result.size());
+        Assert.assertEquals("id", result.get(0).getName());
+        Assert.assertEquals("name", result.get(1).getName());
+        // Verify varchar primary key columns have their length multiplied by 3
+        Column nameColumn = result.get(1);
+        Assert.assertEquals(150, nameColumn.getType().getLength()); // 50 * 3
+        // Verify non-primary key columns follow
+        Assert.assertEquals("age", result.get(2).getName());
+        Assert.assertEquals("email", result.get(3).getName());
+        Assert.assertEquals("address", result.get(4).getName());
+        // Verify non-primary key varchar columns also have their length 
multiplied by 3
+        Column emailColumn = result.get(3);
+        Assert.assertEquals(300, emailColumn.getType().getLength()); // 100 * 3
+        Column addressColumn = result.get(4);
+        Assert.assertEquals(600, addressColumn.getType().getLength()); // 200 
* 3
+    }
+
+    @Test
+    public void testGetColumnsWithVarcharTypeConversion() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("id", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("short_name", 
ScalarType.createVarcharType(50)));
+        mockColumns.add(new Column("long_name", 
ScalarType.createVarcharType(20000)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify varchar length multiplication by 3
+        Column shortName = result.stream()
+                .filter(col -> col.getName().equals("short_name"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(shortName);
+        Assert.assertEquals(150, shortName.getType().getLength()); // 50 * 3
+
+        // Verify long varchar becomes STRING type
+        Column longName = result.stream()
+                .filter(col -> col.getName().equals("long_name"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(longName);
+        Assert.assertTrue(longName.getType().isStringType());
+    }
+
+    @Test
+    public void testGetColumnsWithStringTypeAsPrimaryKey() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("id", ScalarType.createStringType()));
+        mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify string type primary key is converted to varchar
+        Column idColumn = result.stream()
+                .filter(col -> col.getName().equals("id"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(idColumn);
+        Assert.assertTrue(idColumn.getType().isVarchar());
+        Assert.assertEquals(ScalarType.MAX_VARCHAR_LENGTH, 
idColumn.getType().getLength());
+    }
+
+    @Test
+    public void testGetColumnsWithEmptyPrimaryKeys() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = new ArrayList<>();
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("col1", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("col2", ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("col3", 
ScalarType.createType(PrimitiveType.BIGINT)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify columns maintain original order when no primary keys
+        Assert.assertEquals(3, result.size());
+        Assert.assertEquals("col1", result.get(0).getName());
+        Assert.assertEquals("col2", result.get(1).getName());
+        Assert.assertEquals("col3", result.get(2).getName());
+    }
+
+    @Test
+    public void testGetColumnsWithMultiplePrimaryKeys() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("pk3", "pk1", "pk2");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("data1", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("pk1", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("data2", 
ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("pk2", 
ScalarType.createType(PrimitiveType.BIGINT)));
+        mockColumns.add(new Column("pk3", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("data3", ScalarType.createVarcharType(50)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify primary keys are sorted in the order defined in primaryKeys 
list
+        Assert.assertEquals(6, result.size());
+        Assert.assertEquals("pk3", result.get(0).getName());
+        Assert.assertEquals("pk1", result.get(1).getName());
+        Assert.assertEquals("pk2", result.get(2).getName());
+        // Verify non-primary keys follow
+        Assert.assertEquals("data1", result.get(3).getName());
+        Assert.assertEquals("data2", result.get(4).getName());
+        Assert.assertEquals("data3", result.get(5).getName());
+    }
+
+    @Test
+    public void testGetColumnsWithUnsupportedColumnType() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("id", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("unsupported_col", new 
ScalarType(PrimitiveType.UNSUPPORTED)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        // This should throw IllegalArgumentException due to unsupported 
column type
+        try {
+            StreamingJobUtils.getColumns(jdbcClient, database, table, 
primaryKeys);
+            Assert.fail("Expected IllegalArgumentException to be thrown");
+        } catch (IllegalArgumentException e) {
+            // Verify the exception message contains expected information
+            String message = e.getMessage();
+            Assert.assertTrue(message.contains("Unsupported column type"));
+            Assert.assertTrue(message.contains("test_table"));
+            Assert.assertTrue(message.contains("unsupported_col"));
+        }
+    }
+
+    @Test
+    public void testGetColumnsWithVarcharPrimaryKeyLengthMultiplication() 
throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("pk_varchar", "pk_int");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("pk_int", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("pk_varchar", 
ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("normal_varchar", 
ScalarType.createVarcharType(50)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify varchar primary key column has length multiplied by 3
+        Column pkVarcharColumn = result.stream()
+                .filter(col -> col.getName().equals("pk_varchar"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(pkVarcharColumn);
+        Assert.assertEquals(300, pkVarcharColumn.getType().getLength()); // 
100 * 3
+
+        // Verify normal varchar column also has length multiplied by 3
+        Column normalVarcharColumn = result.stream()
+                .filter(col -> col.getName().equals("normal_varchar"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(normalVarcharColumn);
+        Assert.assertEquals(150, normalVarcharColumn.getType().getLength()); 
// 50 * 3
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
index 691380f5b33..ca7379dbcf5 100644
--- 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
@@ -35,7 +35,7 @@ timestamp1    datetime        Yes     false   \N      NONE
 timestamp2     datetime(3)     Yes     false   \N      NONE
 timestamp3     datetime(6)     Yes     false   \N      NONE
 char   char(5) Yes     false   \N      NONE
-varchar        varchar(10)     Yes     false   \N      NONE
+varchar        varchar(30)     Yes     false   \N      NONE
 text   text    Yes     false   \N      NONE
 blob   text    Yes     false   \N      NONE
 json   text    Yes     false   \N      NONE
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index a6bc1d17431..2da9437ab4b 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -58,6 +58,7 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
             sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
             sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
             sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table3}"""
             sql """CREATE TABLE ${mysqlDb}.${table1} (
                   `name` varchar(200) NOT NULL,
                   `age` int DEFAULT NULL,
@@ -109,7 +110,7 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
         // check table schema correct
         def showTbl1 = sql """show create table ${currentDb}.${table1}"""
         def createTalInfo = showTbl1[0][1];
-        assert createTalInfo.contains("`name` varchar(200)");
+        assert createTalInfo.contains("`name` varchar(600)");
         assert createTalInfo.contains("`age` int");
         assert createTalInfo.contains("UNIQUE KEY(`name`)");
         assert createTalInfo.contains("DISTRIBUTED BY HASH(`name`) BUCKETS 
AUTO");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to