This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new d556b6e2ee4 branch-4.0: [Fix](StreamingJob) fix create table issues
when create streaming job #59828 (#59853)
d556b6e2ee4 is described below
commit d556b6e2ee4498668842df13d18ca07941202753
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 14 17:53:09 2026 +0800
branch-4.0: [Fix](StreamingJob) fix create table issues when create
streaming job #59828 (#59853)
Cherry-picked from #59828
Co-authored-by: wudi <[email protected]>
---
.../doris/httpv2/rest/StreamingJobAction.java | 3 +
.../apache/doris/job/util/StreamingJobUtils.java | 27 ++-
.../doris/job/util/StreamingJobUtilsTest.java | 240 +++++++++++++++++++++
.../cdc/test_streaming_mysql_job_all_type.out | 2 +-
.../cdc/test_streaming_mysql_job.groovy | 3 +-
5 files changed, 272 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
index 53610142f12..573e0a17f16 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
@@ -28,6 +28,7 @@ import jakarta.servlet.http.HttpServletRequest;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
+import lombok.ToString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.bind.annotation.RequestBody;
@@ -67,6 +68,7 @@ public class StreamingJobAction extends RestBaseController {
StreamingInsertJob streamingJob = (StreamingInsertJob) job;
try {
+ LOG.info("Committing offset with {}", offsetRequest.toString());
streamingJob.commitOffset(offsetRequest);
return ResponseEntityBuilder.ok("Offset committed successfully");
} catch (Exception e) {
@@ -79,6 +81,7 @@ public class StreamingJobAction extends RestBaseController {
@Getter
@Setter
@NoArgsConstructor
+ @ToString
public static class CommitOffsetRequest {
public long jobId;
public long taskId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 4625417b67d..bac12ae3eba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -59,6 +59,7 @@ import org.apache.commons.text.StringSubstitutor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -333,18 +334,42 @@ public class StreamingJobUtils {
return createtblCmds;
}
- private static List<Column> getColumns(JdbcClient jdbcClient,
+ public static List<Column> getColumns(JdbcClient jdbcClient,
String database,
String table,
List<String> primaryKeys) {
List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
columns.forEach(col -> {
+ Preconditions.checkArgument(!col.getType().isUnsupported(),
+ "Unsupported column type, table:[%s], column:[%s]", table,
col.getName());
+ if (col.getType().isVarchar()) {
+ // The length of varchar needs to be multiplied by 3.
+ int len = col.getType().getLength() * 3;
+ if (len > ScalarType.MAX_VARCHAR_LENGTH) {
+ col.setType(ScalarType.createStringType());
+ } else {
+ col.setType(ScalarType.createVarcharType(len));
+ }
+ }
+
// string can not to be key
if (primaryKeys.contains(col.getName())
&& col.getDataType() == PrimitiveType.STRING) {
col.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH));
}
});
+
+ // sort columns for primary keys
+ columns.sort(
+ Comparator
+ .comparing((Column col) ->
!primaryKeys.contains(col.getName()))
+ .thenComparing(
+ col -> primaryKeys.contains(col.getName())
+ ? primaryKeys.indexOf(col.getName())
+ : Integer.MAX_VALUE
+ )
+ );
+
return columns;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java
new file mode 100644
index 00000000000..3e090d4d3fe
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+ @Mock
+ private JdbcClient jdbcClient;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testGetColumnsWithPrimaryKeySorting() throws Exception {
+ // Prepare test data
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id", "name");
+
+ // Create mock columns in random order
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("age",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("id",
ScalarType.createType(PrimitiveType.BIGINT)));
+ mockColumns.add(new Column("email",
ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+ mockColumns.add(new Column("address",
ScalarType.createVarcharType(200)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify primary keys are at the front in correct order
+ Assert.assertEquals(5, result.size());
+ Assert.assertEquals("id", result.get(0).getName());
+ Assert.assertEquals("name", result.get(1).getName());
+ // Verify varchar primary key columns have their length multiplied by 3
+ Column nameColumn = result.get(1);
+ Assert.assertEquals(150, nameColumn.getType().getLength()); // 50 * 3
+ // Verify non-primary key columns follow
+ Assert.assertEquals("age", result.get(2).getName());
+ Assert.assertEquals("email", result.get(3).getName());
+ Assert.assertEquals("address", result.get(4).getName());
+ // Verify non-primary key varchar columns also have their length
multiplied by 3
+ Column emailColumn = result.get(3);
+ Assert.assertEquals(300, emailColumn.getType().getLength()); // 100 * 3
+ Column addressColumn = result.get(4);
+ Assert.assertEquals(600, addressColumn.getType().getLength()); // 200
* 3
+ }
+
+ @Test
+ public void testGetColumnsWithVarcharTypeConversion() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("id",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("short_name",
ScalarType.createVarcharType(50)));
+ mockColumns.add(new Column("long_name",
ScalarType.createVarcharType(20000)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify varchar length multiplication by 3
+ Column shortName = result.stream()
+ .filter(col -> col.getName().equals("short_name"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(shortName);
+ Assert.assertEquals(150, shortName.getType().getLength()); // 50 * 3
+
+ // Verify long varchar becomes STRING type
+ Column longName = result.stream()
+ .filter(col -> col.getName().equals("long_name"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(longName);
+ Assert.assertTrue(longName.getType().isStringType());
+ }
+
+ @Test
+ public void testGetColumnsWithStringTypeAsPrimaryKey() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("id", ScalarType.createStringType()));
+ mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify string type primary key is converted to varchar
+ Column idColumn = result.stream()
+ .filter(col -> col.getName().equals("id"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(idColumn);
+ Assert.assertTrue(idColumn.getType().isVarchar());
+ Assert.assertEquals(ScalarType.MAX_VARCHAR_LENGTH,
idColumn.getType().getLength());
+ }
+
+ @Test
+ public void testGetColumnsWithEmptyPrimaryKeys() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = new ArrayList<>();
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("col1",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("col2", ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("col3",
ScalarType.createType(PrimitiveType.BIGINT)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify columns maintain original order when no primary keys
+ Assert.assertEquals(3, result.size());
+ Assert.assertEquals("col1", result.get(0).getName());
+ Assert.assertEquals("col2", result.get(1).getName());
+ Assert.assertEquals("col3", result.get(2).getName());
+ }
+
+ @Test
+ public void testGetColumnsWithMultiplePrimaryKeys() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("pk3", "pk1", "pk2");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("data1",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("pk1",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("data2",
ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("pk2",
ScalarType.createType(PrimitiveType.BIGINT)));
+ mockColumns.add(new Column("pk3",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("data3", ScalarType.createVarcharType(50)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify primary keys are sorted in the order defined in primaryKeys
list
+ Assert.assertEquals(6, result.size());
+ Assert.assertEquals("pk3", result.get(0).getName());
+ Assert.assertEquals("pk1", result.get(1).getName());
+ Assert.assertEquals("pk2", result.get(2).getName());
+ // Verify non-primary keys follow
+ Assert.assertEquals("data1", result.get(3).getName());
+ Assert.assertEquals("data2", result.get(4).getName());
+ Assert.assertEquals("data3", result.get(5).getName());
+ }
+
+ @Test
+ public void testGetColumnsWithUnsupportedColumnType() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("id",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("unsupported_col", new
ScalarType(PrimitiveType.UNSUPPORTED)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ // This should throw IllegalArgumentException due to unsupported
column type
+ try {
+ StreamingJobUtils.getColumns(jdbcClient, database, table,
primaryKeys);
+ Assert.fail("Expected IllegalArgumentException to be thrown");
+ } catch (IllegalArgumentException e) {
+ // Verify the exception message contains expected information
+ String message = e.getMessage();
+ Assert.assertTrue(message.contains("Unsupported column type"));
+ Assert.assertTrue(message.contains("test_table"));
+ Assert.assertTrue(message.contains("unsupported_col"));
+ }
+ }
+
+ @Test
+ public void testGetColumnsWithVarcharPrimaryKeyLengthMultiplication()
throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("pk_varchar", "pk_int");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("pk_int",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("pk_varchar",
ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("normal_varchar",
ScalarType.createVarcharType(50)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify varchar primary key column has length multiplied by 3
+ Column pkVarcharColumn = result.stream()
+ .filter(col -> col.getName().equals("pk_varchar"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(pkVarcharColumn);
+ Assert.assertEquals(300, pkVarcharColumn.getType().getLength()); //
100 * 3
+
+ // Verify normal varchar column also has length multiplied by 3
+ Column normalVarcharColumn = result.stream()
+ .filter(col -> col.getName().equals("normal_varchar"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(normalVarcharColumn);
+ Assert.assertEquals(150, normalVarcharColumn.getType().getLength());
// 50 * 3
+ }
+}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
index 691380f5b33..ca7379dbcf5 100644
---
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out
@@ -35,7 +35,7 @@ timestamp1 datetime Yes false \N NONE
timestamp2 datetime(3) Yes false \N NONE
timestamp3 datetime(6) Yes false \N NONE
char char(5) Yes false \N NONE
-varchar varchar(10) Yes false \N NONE
+varchar varchar(30) Yes false \N NONE
text text Yes false \N NONE
blob text Yes false \N NONE
json text Yes false \N NONE
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index a6bc1d17431..2da9437ab4b 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -58,6 +58,7 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table3}"""
sql """CREATE TABLE ${mysqlDb}.${table1} (
`name` varchar(200) NOT NULL,
`age` int DEFAULT NULL,
@@ -109,7 +110,7 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
// check table schema correct
def showTbl1 = sql """show create table ${currentDb}.${table1}"""
def createTalInfo = showTbl1[0][1];
- assert createTalInfo.contains("`name` varchar(200)");
+ assert createTalInfo.contains("`name` varchar(600)");
assert createTalInfo.contains("`age` int");
assert createTalInfo.contains("UNIQUE KEY(`name`)");
assert createTalInfo.contains("DISTRIBUTED BY HASH(`name`) BUCKETS
AUTO");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]