Copilot commented on code in PR #59828:
URL: https://github.com/apache/doris/pull/59828#discussion_r2686008034
##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -333,18 +334,42 @@ public static List<CreateTableCommand>
generateCreateTableCmds(String targetDb,
return createtblCmds;
}
- private static List<Column> getColumns(JdbcClient jdbcClient,
+ public static List<Column> getColumns(JdbcClient jdbcClient,
String database,
String table,
List<String> primaryKeys) {
List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
columns.forEach(col -> {
+ Preconditions.checkArgument(!col.getType().isUnsupported(),
+ "Unsupported column type, table:[%s], column:[%s]", table,
col.getName());
+ if (col.getType().isVarchar()) {
+ // The length of varchar needs to be multiplied by 3.
+ int len = col.getType().getLength() * 3;
+ if (col.getType().getLength() * 3 >
ScalarType.MAX_VARCHAR_LENGTH) {
Review Comment:
The varchar length calculation is duplicated. Line 347 calculates `len =
col.getType().getLength() * 3`, but then line 348 recalculates the same value
`col.getType().getLength() * 3` instead of using the `len` variable. This
should use `len > ScalarType.MAX_VARCHAR_LENGTH` for consistency and efficiency.
```suggestion
if (len > ScalarType.MAX_VARCHAR_LENGTH) {
```
##########
fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+ @Mock
+ private JdbcClient jdbcClient;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
Review Comment:
The test uses the deprecated MockitoAnnotations.initMocks(this) method. This
method has been deprecated since Mockito 2.0 and should be replaced with
MockitoAnnotations.openMocks(this). The deprecated method may be removed in
future versions of Mockito.
```suggestion
MockitoAnnotations.openMocks(this);
```
##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java:
##########
@@ -67,6 +68,8 @@ private Object updateOffset(CommitOffsetRequest
offsetRequest) {
StreamingInsertJob streamingJob = (StreamingInsertJob) job;
try {
+ LOG.info("Committing offset for job {}, task {}, offset {}",
+ offsetRequest.getJobId(), offsetRequest.getTaskId(),
offsetRequest.toString());
Review Comment:
The log statement is redundant. The third placeholder "offset {}" receives
offsetRequest.toString() which will include all fields (jobId, taskId, offset,
scannedRows, scannedBytes) due to the @ToString annotation. However, jobId and
taskId are already logged separately in the first two placeholders. Consider
changing to log only the offset field (offsetRequest.getOffset()) or additional
fields that aren't already logged (scannedRows, scannedBytes), or adjust the
placeholders to match what you want to log.
```suggestion
offsetRequest.getJobId(), offsetRequest.getTaskId(),
offsetRequest.getOffset());
```
##########
fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+ @Mock
+ private JdbcClient jdbcClient;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testGetColumnsWithPrimaryKeySorting() throws Exception {
+ // Prepare test data
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id", "name");
+
+ // Create mock columns in random order
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("age",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("id",
ScalarType.createType(PrimitiveType.BIGINT)));
+ mockColumns.add(new Column("email",
ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+ mockColumns.add(new Column("address",
ScalarType.createVarcharType(200)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify primary keys are at the front in correct order
+ Assert.assertEquals(5, result.size());
+ Assert.assertEquals("id", result.get(0).getName());
+ Assert.assertEquals("name", result.get(1).getName());
+ // Verify non-primary key columns follow
+ Assert.assertEquals("age", result.get(2).getName());
+ Assert.assertEquals("email", result.get(3).getName());
+ Assert.assertEquals("address", result.get(4).getName());
+ }
Review Comment:
The test does not verify that varchar columns are also multiplied by 3 when
they are part of primary keys. The test 'testGetColumnsWithPrimaryKeySorting'
uses varchar columns (email, name, address) but only checks the ordering, not
whether their lengths were properly multiplied by 3. Consider adding assertions
to verify that varchar primary key columns also have their length multiplied by
3.
##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -333,18 +334,42 @@ public static List<CreateTableCommand>
generateCreateTableCmds(String targetDb,
return createtblCmds;
}
- private static List<Column> getColumns(JdbcClient jdbcClient,
+ public static List<Column> getColumns(JdbcClient jdbcClient,
String database,
String table,
List<String> primaryKeys) {
List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
columns.forEach(col -> {
+ Preconditions.checkArgument(!col.getType().isUnsupported(),
+ "Unsupported column type, table:[%s], column:[%s]", table,
col.getName());
+ if (col.getType().isVarchar()) {
+ // The length of varchar needs to be multiplied by 3.
+ int len = col.getType().getLength() * 3;
+ if (col.getType().getLength() * 3 >
ScalarType.MAX_VARCHAR_LENGTH) {
+ col.setType(ScalarType.createStringType());
+ } else {
+ col.setType(ScalarType.createVarcharType(len));
+ }
+ }
+
// string can not to be key
if (primaryKeys.contains(col.getName())
&& col.getDataType() == PrimitiveType.STRING) {
col.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH));
}
});
+
+ // sort columns for primary keys
+ columns.sort(
+ Comparator
+ .comparing((Column col) ->
!primaryKeys.contains(col.getName()))
+ .thenComparing(
+ col -> primaryKeys.contains(col.getName())
+ ? primaryKeys.indexOf(col.getName())
+ : Integer.MAX_VALUE
Review Comment:
The sorting comparator calls `primaryKeys.contains()` and
`primaryKeys.indexOf()` multiple times for each comparison operation. Since
`primaryKeys` is a List, both operations are O(n), making the overall sort
O(n²log n) instead of O(n log n). Consider converting `primaryKeys` to a Set
for contains checks and/or creating a Map for index lookups before sorting to
improve performance, especially when there are many columns or primary keys.
```suggestion
Map<String, Integer> primaryKeyOrder = new HashMap<>();
for (int i = 0; i < primaryKeys.size(); i++) {
primaryKeyOrder.put(primaryKeys.get(i), i);
}
columns.sort(
Comparator
.comparing((Column col) ->
!primaryKeyOrder.containsKey(col.getName()))
.thenComparing(
col ->
primaryKeyOrder.getOrDefault(col.getName(), Integer.MAX_VALUE)
```
##########
fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+ @Mock
+ private JdbcClient jdbcClient;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testGetColumnsWithPrimaryKeySorting() throws Exception {
+ // Prepare test data
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id", "name");
+
+ // Create mock columns in random order
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("age",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("id",
ScalarType.createType(PrimitiveType.BIGINT)));
+ mockColumns.add(new Column("email",
ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+ mockColumns.add(new Column("address",
ScalarType.createVarcharType(200)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify primary keys are at the front in correct order
+ Assert.assertEquals(5, result.size());
+ Assert.assertEquals("id", result.get(0).getName());
+ Assert.assertEquals("name", result.get(1).getName());
+ // Verify non-primary key columns follow
+ Assert.assertEquals("age", result.get(2).getName());
+ Assert.assertEquals("email", result.get(3).getName());
+ Assert.assertEquals("address", result.get(4).getName());
+ }
+
+ @Test
+ public void testGetColumnsWithVarcharTypeConversion() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("id",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("short_name",
ScalarType.createVarcharType(50)));
+ mockColumns.add(new Column("long_name",
ScalarType.createVarcharType(20000)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify varchar length multiplication by 3
+ Column shortName = result.stream()
+ .filter(col -> col.getName().equals("short_name"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(shortName);
+ Assert.assertEquals(150, shortName.getType().getLength()); // 50 * 3
+
+ // Verify long varchar becomes STRING type
+ Column longName = result.stream()
+ .filter(col -> col.getName().equals("long_name"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(longName);
+ Assert.assertTrue(longName.getType().isStringType());
+ }
+
+ @Test
+ public void testGetColumnsWithStringTypeAsPrimaryKey() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("id");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("id", ScalarType.createStringType()));
+ mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify string type primary key is converted to varchar
+ Column idColumn = result.stream()
+ .filter(col -> col.getName().equals("id"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(idColumn);
+ Assert.assertTrue(idColumn.getType().isVarchar());
+ Assert.assertEquals(ScalarType.MAX_VARCHAR_LENGTH,
idColumn.getType().getLength());
+ }
+
+ @Test
+ public void testGetColumnsWithEmptyPrimaryKeys() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = new ArrayList<>();
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("col1",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("col2", ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("col3",
ScalarType.createType(PrimitiveType.BIGINT)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify columns maintain original order when no primary keys
+ Assert.assertEquals(3, result.size());
+ Assert.assertEquals("col1", result.get(0).getName());
+ Assert.assertEquals("col2", result.get(1).getName());
+ Assert.assertEquals("col3", result.get(2).getName());
+ }
+
+ @Test
+ public void testGetColumnsWithMultiplePrimaryKeys() throws Exception {
+ String database = "test_db";
+ String table = "test_table";
+ List<String> primaryKeys = Arrays.asList("pk3", "pk1", "pk2");
+
+ List<Column> mockColumns = new ArrayList<>();
+ mockColumns.add(new Column("data1",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("pk1",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("data2",
ScalarType.createVarcharType(100)));
+ mockColumns.add(new Column("pk2",
ScalarType.createType(PrimitiveType.BIGINT)));
+ mockColumns.add(new Column("pk3",
ScalarType.createType(PrimitiveType.INT)));
+ mockColumns.add(new Column("data3", ScalarType.createVarcharType(50)));
+
+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(),
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+ List<Column> result = StreamingJobUtils.getColumns(jdbcClient,
database, table, primaryKeys);
+
+ // Verify primary keys are sorted in the order defined in primaryKeys
list
+ Assert.assertEquals(6, result.size());
+ Assert.assertEquals("pk3", result.get(0).getName());
+ Assert.assertEquals("pk1", result.get(1).getName());
+ Assert.assertEquals("pk2", result.get(2).getName());
+ // Verify non-primary keys follow
+ Assert.assertEquals("data1", result.get(3).getName());
+ Assert.assertEquals("data2", result.get(4).getName());
+ Assert.assertEquals("data3", result.get(5).getName());
+ }
+}
Review Comment:
Missing test coverage for the unsupported column type validation. The code
at lines 343-344 in StreamingJobUtils.java checks for unsupported column types
and throws an exception, but there is no corresponding test case that verifies
this behavior. Consider adding a test that validates the exception is thrown
when an unsupported column type is encountered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]