liugddx opened a new issue, #38174:
URL: https://github.com/apache/doris/issues/38174

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Version
   
   master
   
   ### What's Wrong?
   
   The value of timestamp type data obtained through arrow is incorrect.
   <img width="980" alt="image" 
src="https://github.com/user-attachments/assets/557ceca9-b6ad-44b5-a56c-253e33f52d2c";>
   <img width="1123" alt="image" 
src="https://github.com/user-attachments/assets/ae00da53-fc5b-4499-a617-93330de3c90f";>
   
   
   ### What You Expected?
   
   The actual value is 2024, but the extracted value is 1970.
   
   ### How to Reproduce?
   
   - ddl
   ```
   -- e2e_sink.doris_e2e_unique_table definition
   
   CREATE TABLE `doris_e2e_unique_table` (
     `F_ID` bigint(20) NULL,
     `F_INT` int(11) NULL,
     `F_BIGINT` bigint(20) NULL,
     `F_TINYINT` tinyint(4) NULL,
     `F_SMALLINT` smallint(6) NULL,
     `F_DECIMAL` DECIMAL(18, 6) NULL,
     `F_LARGEINT` largeint(40) NULL,
     `F_BOOLEAN` boolean NULL,
     `F_DOUBLE` double NULL,
     `F_FLOAT` float NULL,
     `F_CHAR` char(1) NULL,
     `F_VARCHAR_11` varchar(11) NULL,
     `F_STRING` text NULL,
     `F_DATETIME_P` datetime(6) NULL,
     `F_DATETIME` datetime NULL,
     `F_DATE` date NULL,
     `MAP_VARCHAR_BOOLEAN` MAP<text,boolean> NULL,
     `MAP_CHAR_TINYINT` MAP<text,tinyint(4)> NULL,
     `MAP_STRING_SMALLINT` MAP<text,smallint(6)> NULL,
     `MAP_INT_INT` MAP<int(11),int(11)> NULL,
     `MAP_TINYINT_BIGINT` MAP<tinyint(4),bigint(20)> NULL,
     `MAP_SMALLINT_LARGEINT` MAP<smallint(6),decimalv3(20, 0)> NULL,
     `MAP_BIGINT_FLOAT` MAP<bigint(20),float> NULL,
     `MAP_LARGEINT_DOUBLE` MAP<decimalv3(20, 0),double> NULL,
     `MAP_STRING_DECIMAL` MAP<text,decimalv3(10, 2)> NULL,
     `MAP_DECIMAL_DATE` MAP<decimalv3(10, 2),datev2> NULL,
     `MAP_DATE_DATETIME` MAP<datev2,datetimev2(6)> NULL,
     `MAP_DATETIME_CHAR` MAP<datetimev2(6),text> NULL,
     `MAP_CHAR_VARCHAR` MAP<text,text> NULL,
     `MAP_VARCHAR_STRING` MAP<text,text> NULL
   ) ENGINE=OLAP
   UNIQUE KEY(`F_ID`)
   COMMENT 'OLAP'
   DISTRIBUTED BY HASH(`F_ID`) BUCKETS 10
   PROPERTIES (
   "replication_allocation" = "tag.location.default: 1",
   "is_being_synced" = "false",
   "storage_format" = "V2",
   "light_schema_change" = "true",
   "disable_auto_compaction" = "false",
   "enable_single_replica_compaction" = "false"
   );
   ```
   
   -data
   ```
   INSERT INTO `doris_e2e_unique_table` 
   (F_ID, F_INT, F_BIGINT, F_TINYINT, F_SMALLINT, F_DECIMAL, F_LARGEINT, 
F_BOOLEAN, F_DOUBLE, F_FLOAT, F_CHAR, F_VARCHAR_11, F_STRING, F_DATETIME_P, 
F_DATETIME, F_DATE, MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, 
MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, 
MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, 
MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING) 
   VALUES 
   (1, 10, 10000000000, 1, 100, 123.456789, 9223372036854775807, TRUE, 3.14159, 
2.71828, 'A', 'Hello World', 'Sample text for the string field.', '2024-07-19 
12:00:00.123456', '2024-07-19 12:00:00', '2024-07-19', 
       {'key1': TRUE, 'key2': FALSE}, 
       {'A': 1, 'B': 2}, 
       {'text1': 1, 'text2': 2}, 
       {1: 1, 2: 4}, 
       {1: 10000000000}, 
       {100: 9223372036854775807}, 
       {10000000000: 3.14}, 
       {9223372036854775807: 3.14}, 
       {'value1': 123.45, 'value2': 678.90}, 
       {123.45: '2024-07-19'}, 
       {'2024-07-19': '2024-07-19 12:00:00'}, 
       {'2024-07-19 12:00:00': 'Datetime example'}, 
       {'key1': 'value1', 'key2': 'value2'}, 
       {'keyA': 'Sample text', 'keyB': 'Another text'}
   )
   ```
   
   -code
   
   ```
   // Licensed to the Apache Software Foundation (ASF) under one
   // or more contributor license agreements.  See the NOTICE file
   // distributed with this work for additional information
   // regarding copyright ownership.  The ASF licenses this file
   // to you under the Apache License, Version 2.0 (the
   // "License"); you may not use this file except in compliance
   // with the License.  You may obtain a copy of the License at
   //
   //   http://www.apache.org/licenses/LICENSE-2.0
   //
   // Unless required by applicable law or agreed to in writing,
   // software distributed under the License is distributed on an
   // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   // KIND, either express or implied.  See the License for the
   // specific language governing permissions and limitations
   // under the License.
   package org.liugddx;
   
   import com.alibaba.fastjson.JSON;
   import com.alibaba.fastjson.JSONObject;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.BigIntVector;
   import org.apache.arrow.vector.BitVector;
   import org.apache.arrow.vector.DateDayVector;
   import org.apache.arrow.vector.DecimalVector;
   import org.apache.arrow.vector.FieldVector;
   import org.apache.arrow.vector.Float4Vector;
   import org.apache.arrow.vector.Float8Vector;
   import org.apache.arrow.vector.IntVector;
   import org.apache.arrow.vector.SmallIntVector;
   import org.apache.arrow.vector.TimeSecVector;
   import org.apache.arrow.vector.TimeStampMicroVector;
   import org.apache.arrow.vector.TinyIntVector;
   import org.apache.arrow.vector.VarBinaryVector;
   import org.apache.arrow.vector.VarCharVector;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.complex.ListVector;
   import org.apache.arrow.vector.ipc.ArrowStreamReader;
   import org.apache.arrow.vector.types.Types;
   import org.apache.commons.codec.binary.Base64;
   import org.apache.doris.sdk.thrift.TDorisExternalService;
   import org.apache.doris.sdk.thrift.TScanBatchResult;
   import org.apache.doris.sdk.thrift.TScanCloseParams;
   import org.apache.doris.sdk.thrift.TScanColumnDesc;
   import org.apache.doris.sdk.thrift.TScanNextBatchParams;
   import org.apache.doris.sdk.thrift.TScanOpenParams;
   import org.apache.doris.sdk.thrift.TScanOpenResult;
   import org.apache.doris.sdk.thrift.TStatusCode;
   import org.apache.http.HttpHeaders;
   import org.apache.http.client.methods.CloseableHttpResponse;
   import org.apache.http.client.methods.HttpPost;
   import org.apache.http.entity.StringEntity;
   import org.apache.http.impl.client.CloseableHttpClient;
   import org.apache.http.impl.client.HttpClients;
   import org.apache.http.util.EntityUtils;
   import org.apache.thrift.TConfiguration;
   import org.apache.thrift.protocol.TBinaryProtocol;
   import org.apache.thrift.protocol.TProtocol;
   import org.apache.thrift.transport.TSocket;
   import org.apache.thrift.transport.TTransport;
   
   import java.io.ByteArrayInputStream;
   import java.math.BigDecimal;
   import java.nio.charset.StandardCharsets;
   import java.time.LocalDate;
   import java.time.format.DateTimeFormatter;
   import java.util.ArrayList;
   import java.util.Arrays;
   import java.util.HashMap;
   import java.util.List;
   import java.util.Map;
   
   public class Main {
       static String dorisUrl = "172.30.34.130:8030";
       static String user = "root";
       static String password = "";
       static String database = "e2e_source";
       static String table = "doris_e2e_unique_table";
       static String sql = "select * from e2e_source.doris_e2e_unique_table";
       static int readRowCount = 0;
       static List<List<Object>> result = new ArrayList<>();
   
       private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
       private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd 
HH:mm:ss.SSSSSS";
       private final DateTimeFormatter dateTimeV2Formatter =
               DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
       private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
   
       public static void main(String[] args) throws Exception {
           JSONObject queryPlan = getQueryPlan();
           System.out.println(queryPlan);
           readData(queryPlan);
           System.out.println(result);
           System.out.println(result.size());
       }
   
       private static JSONObject getQueryPlan() throws Exception {
           try (CloseableHttpClient client = HttpClients.custom().build()) {
               HttpPost post = new 
HttpPost(String.format("http://%s/api/%s/%s/_query_plan";, dorisUrl, database, 
table));
               post.setHeader(HttpHeaders.EXPECT, "100-continue");
               post.setHeader(HttpHeaders.AUTHORIZATION,  "Basic " + new 
String(Base64.encodeBase64((user + ":" + 
password).getBytes(StandardCharsets.UTF_8))));
   
               //The param is specific SQL, and the query plan is returned
               Map<String,String> params = new HashMap<>();
               params.put("sql",sql);
               StringEntity entity = new 
StringEntity(JSON.toJSONString(params));
               post.setEntity(entity);
   
               try (CloseableHttpResponse response = client.execute(post)) {
                   if (response.getEntity() != null ) {
                       String loadResult = 
EntityUtils.toString(response.getEntity());
                       JSONObject queryPlan = 
JSONObject.parseObject(loadResult);
                       return queryPlan;
                   }
               }
           }
           return null;
       }
   
       private static void readData(JSONObject queryRes) throws Exception {
           JSONObject data = queryRes.getJSONObject("data");
           String queryPlan = data.getString("opaqued_query_plan");
           JSONObject partitions = data.getJSONObject("partitions");
           for(Map.Entry<String, Object> tablet : partitions.entrySet()){
               Long tabletId = Long.parseLong(tablet.getKey());
               JSONObject value = 
JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));
               //get first backend
               String routingsBackend = 
value.getJSONArray("routings").getString(0);
               String backendHost = routingsBackend.split(":")[0];
               String backendPort = routingsBackend.split(":")[1];
   
               //connect backend
               TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
               TTransport transport = new TSocket(new TConfiguration(), 
backendHost, Integer.parseInt(backendPort));
               TProtocol protocol = factory.getProtocol(transport);
               TDorisExternalService.Client client = new 
TDorisExternalService.Client(protocol);
               if (!transport.isOpen()) {
                   transport.open();
               }
   
               //build params
               TScanOpenParams params = new TScanOpenParams();
               params.cluster = "default_cluster";
               params.database = database;
               params.table = table;
               params.tablet_ids = Arrays.asList(tabletId);
               params.opaqued_query_plan = queryPlan;
               // max row number of one read batch
               params.setBatchSize(1024);
               params.setQueryTimeout(3600);
               params.setMemLimit(2147483648L);
               params.setUser(user);
               params.setPasswd(password);
   
               //            int offset =0;
               //open scanner
               TScanOpenResult tScanOpenResult = client.openScanner(params);
               if 
(!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {
                   throw new RuntimeException(String.format("The status of open 
scanner result from %s is '%s', error message is: %s.",
                           routingsBackend, 
tScanOpenResult.getStatus().getStatusCode(), 
tScanOpenResult.getStatus().getErrorMsgs()));
               }
               List<TScanColumnDesc> selectedColumns = 
tScanOpenResult.getSelectedColumns();
   
               TScanNextBatchParams nextBatchParams = new 
TScanNextBatchParams();
               nextBatchParams.setContextId(tScanOpenResult.getContextId());
               boolean eos = false;
               //read data
               int offset = 0;
               while(!eos){
                   nextBatchParams.setOffset(offset);
                   TScanBatchResult next = client.getNext(nextBatchParams);
                   if 
(!TStatusCode.OK.equals(next.getStatus().getStatusCode())) {
                       throw new RuntimeException(String.format("The status of 
get next result from %s is '%s', error message is: %s.",
                               routingsBackend, 
next.getStatus().getStatusCode(), next.getStatus().getErrorMsgs()));
                   }
                   eos = next.isEos();
                   if(!eos){
                       int i = convertArrow(next, selectedColumns);
                       offset += i;
                       readRowCount = offset;
                   }
               }
               //close
               TScanCloseParams closeParams = new TScanCloseParams();
               closeParams.setContextId(tScanOpenResult.getContextId());
               client.closeScanner(closeParams);
               if ((transport != null) && transport.isOpen()) {
                   transport.close();
               }
               //System.out.println(String.format("read tablet %s from backend 
%s finished", tabletId, routingsBackend));
           }
       }
   
       private static int convertArrow(TScanBatchResult nextResult, 
List<TScanColumnDesc> selectedColumns) throws Exception {
           int offset = 0;
           RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
           ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new 
ByteArrayInputStream(nextResult.getRows()), rootAllocator);
   
           VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
           while (arrowStreamReader.loadNextBatch()) {
               List<FieldVector>  fieldVectors = root.getFieldVectors();
               //total data rows
               int rowCountInOneBatch = root.getRowCount();
               // init the result
               for (int i = 0; i < rowCountInOneBatch; ++i) {
                   result.add(new ArrayList<>(fieldVectors.size()));
               }
               //Arrow returns in column format and needs to be converted to 
row format
               for (int col = 0; col < fieldVectors.size(); col++) {
                   FieldVector fieldVector = fieldVectors.get(col);
                   Types.MinorType minorType = fieldVector.getMinorType();
                   for(int row = 0 ; row < rowCountInOneBatch ;row++){
                       convertValue(row , minorType, fieldVector);
                   }
               }
               offset += root.getRowCount();
           }
           return offset;
       }
   
   
       private static void convertValue(int rowIndex,
               Types.MinorType minorType,
               FieldVector fieldVector) throws Exception {
   
           switch (minorType) {
               case BIT:
                   BitVector bitVector = (BitVector) fieldVector;
                   Object fieldValue = bitVector.isNull(rowIndex) ? null : 
bitVector.get(rowIndex) != 0;
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case TINYINT:
                   TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
                   fieldValue = tinyIntVector.isNull(rowIndex) ? null : 
tinyIntVector.get(rowIndex);
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case SMALLINT:
                   SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
                   fieldValue = smallIntVector.isNull(rowIndex) ? null : 
smallIntVector.get(rowIndex);
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case INT:
                   IntVector intVector = (IntVector) fieldVector;
                   fieldValue = intVector.isNull(rowIndex) ? null : 
intVector.get(rowIndex);
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case BIGINT:
                   BigIntVector bigIntVector = (BigIntVector) fieldVector;
                   fieldValue = bigIntVector.isNull(rowIndex) ? null : 
bigIntVector.get(rowIndex);
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case FLOAT4:
                   Float4Vector float4Vector = (Float4Vector) fieldVector;
                   fieldValue = float4Vector.isNull(rowIndex) ? null : 
float4Vector.get(rowIndex);
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case FLOAT8:
                   Float8Vector float8Vector = (Float8Vector) fieldVector;
                   fieldValue = float8Vector.isNull(rowIndex) ? null : 
float8Vector.get(rowIndex);
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case VARBINARY:
                   VarBinaryVector varBinaryVector = (VarBinaryVector) 
fieldVector;
                   fieldValue = varBinaryVector.isNull(rowIndex) ? null : 
varBinaryVector.get(rowIndex);
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               case DECIMAL:
                   DecimalVector decimalVector = (DecimalVector) fieldVector;
                   BigDecimal value = 
decimalVector.getObject(rowIndex).stripTrailingZeros();
                   result.get(readRowCount + rowIndex).add(value);
                   break;
               case VARCHAR:
                   VarCharVector date = (VarCharVector) fieldVector;
                   String stringValue = new String(date.get(rowIndex));
                   result.get(readRowCount + rowIndex).add(stringValue);
                   break;
               case LIST:
                   ListVector listVector = (ListVector) fieldVector;
                   List listValue = listVector.isNull(rowIndex) ? null : 
listVector.getObject(rowIndex);
                   result.get(readRowCount + rowIndex).add(listValue);
                   break;
               case TIMESTAMPMICRO:
                   TimeStampMicroVector timestampVector = 
(TimeStampMicroVector) fieldVector;
                   String timestampVectorStringValue = 
timestampVector.getObject(rowIndex).toString();
                   timestampVectorStringValue = 
completeMilliseconds(timestampVectorStringValue);
                   result.get(readRowCount + 
rowIndex).add(timestampVectorStringValue);
                   break;
               case DATEDAY:
                   DateDayVector dateDayVector = (DateDayVector) fieldVector;
                   fieldValue = dateDayVector.isNull(rowIndex) ? null : 
LocalDate.ofEpochDay(dateDayVector.get(rowIndex));
                   result.get(readRowCount + rowIndex).add(fieldValue);
                   break;
               default:
                   String errMsg = "Unsupported type " + minorType;
                   throw new RuntimeException(errMsg);
           }
       }
   
       private static String completeMilliseconds(String stringValue) {
           if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
               return stringValue;
           }
           StringBuilder sb = new StringBuilder(stringValue);
           if (stringValue.length() == DATETIME_PATTERN.length()) {
               sb.append(".");
           }
           while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
               sb.append(0);
           }
           return sb.toString();
       }
   }
   
   ```
   
   ### Anything Else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to