This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 61c9d11 support change column type from decimal to string (#6643) 61c9d11 is described below commit 61c9d11fdb72dccf94876ae3706e7ed492622807 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Tue Sep 14 15:56:44 2021 +0800 support change column type from decimal to string (#6643) --- be/src/olap/rowset/segment_v2/encoding_info.cpp | 1 + be/src/runtime/data_stream_recvr.cc | 5 +- be/src/runtime/data_stream_sender.cpp | 65 +++++++++++----------- be/src/runtime/row_batch.cpp | 12 ++-- be/src/runtime/row_batch.h | 8 +-- .../main/java/org/apache/doris/catalog/Column.java | 3 +- 6 files changed, 47 insertions(+), 47 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/encoding_info.cpp b/be/src/olap/rowset/segment_v2/encoding_info.cpp index 801cdb4..e4ce43c 100644 --- a/be/src/olap/rowset/segment_v2/encoding_info.cpp +++ b/be/src/olap/rowset/segment_v2/encoding_info.cpp @@ -255,6 +255,7 @@ EncodingInfoResolver::EncodingInfoResolver() { _add_map<OLAP_FIELD_TYPE_VARCHAR, PLAIN_ENCODING>(); _add_map<OLAP_FIELD_TYPE_VARCHAR, PREFIX_ENCODING, true>(); + _add_map<OLAP_FIELD_TYPE_STRING, DICT_ENCODING>(); _add_map<OLAP_FIELD_TYPE_STRING, PLAIN_ENCODING>(); _add_map<OLAP_FIELD_TYPE_STRING, PREFIX_ENCODING, true>(); diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index b836253..49b2495 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -213,7 +213,7 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n _packet_seq_map.emplace(be_number, packet_seq); } - int batch_size = RowBatch::get_batch_size(pb_batch); + size_t batch_size = RowBatch::get_batch_size(pb_batch); COUNTER_UPDATE(_recvr->_bytes_received_counter, batch_size); // Following situation will match the following condition. @@ -446,8 +446,7 @@ DataStreamRecvr::DataStreamRecvr( _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { - _mem_tracker = MemTracker::CreateTracker( - _profile, -1, "DataStreamRecvr", parent_tracker); + _mem_tracker = MemTracker::CreateTracker(_profile, -1, "DataStreamRecvr", parent_tracker); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index fa7c979..2884374 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -52,35 +52,35 @@ namespace doris { DataStreamSender::Channel::Channel(DataStreamSender* parent, const RowDescriptor& row_desc, - const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain, - bool send_query_statistics_with_every_batch) - : _parent(parent), - _buffer_size(buffer_size), - _row_desc(row_desc), - _fragment_instance_id(fragment_instance_id), - _dest_node_id(dest_node_id), - _num_data_bytes_sent(0), - _packet_seq(0), - _need_close(false), - _be_number(0), - _brpc_dest_addr(brpc_dest), - _is_transfer_chain(is_transfer_chain), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) { + const TNetworkAddress& brpc_dest, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, + int buffer_size, bool is_transfer_chain, + bool send_query_statistics_with_every_batch) + : _parent(parent), + _buffer_size(buffer_size), + _row_desc(row_desc), + _fragment_instance_id(fragment_instance_id), + _dest_node_id(dest_node_id), + _num_data_bytes_sent(0), + _packet_seq(0), + _need_close(false), + _be_number(0), + _brpc_dest_addr(brpc_dest), + _is_transfer_chain(is_transfer_chain), + _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) { std::string localhost = BackendOptions::get_localhost(); - _is_local = - _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port; + _is_local = _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port; if (_is_local) { - LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id; + LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id; } } DataStreamSender::Channel::~Channel() { - if (_closure != nullptr && _closure->unref()) { - delete _closure; - } - // release this before request desctruct - _brpc_request.release_finst_id(); + if (_closure != nullptr && _closure->unref()) { + delete _closure; + } + // release this before request desctruct + _brpc_request.release_finst_id(); } Status DataStreamSender::Channel::init(RuntimeState* state) { @@ -185,7 +185,7 @@ Status DataStreamSender::Channel::send_current_batch(bool eos) { } { SCOPED_TIMER(_parent->_serialize_batch_timer); - int uncompressed_bytes = _batch->serialize(&_pb_batch); + size_t uncompressed_bytes = _batch->serialize(&_pb_batch); COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch)); COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes); } @@ -259,13 +259,12 @@ Status DataStreamSender::Channel::close_wait(RuntimeState* state) { DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc) : _pool(pool), - _sender_id(sender_id), - _row_desc(row_desc), - _serialize_batch_timer(NULL), - _bytes_sent_counter(NULL), - _local_bytes_send_counter(NULL), - _current_pb_batch(&_pb_batch1) { -} + _sender_id(sender_id), + _row_desc(row_desc), + _serialize_batch_timer(NULL), + _bytes_sent_counter(NULL), + _local_bytes_send_counter(NULL), + _current_pb_batch(&_pb_batch1) {} DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, @@ -648,8 +647,8 @@ Status DataStreamSender::serialize_batch(RowBatch* src, T* dest, int num_receive SCOPED_TIMER(_serialize_batch_timer); // TODO(zc) // RETURN_IF_ERROR(src->serialize(dest)); - int uncompressed_bytes = src->serialize(dest); - int bytes = RowBatch::get_batch_size(*dest); + size_t uncompressed_bytes = src->serialize(dest); + size_t bytes = RowBatch::get_batch_size(*dest); // TODO(zc) // int uncompressed_bytes = bytes - dest->tuple_data.size() + dest->uncompressed_size; // The size output_batch would be if we didn't compress tuple_data (will be equal to diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 0c76a67..25855f6 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -370,7 +370,7 @@ RowBatch::~RowBatch() { clear(); } -int RowBatch::serialize(TRowBatch* output_batch) { +size_t RowBatch::serialize(TRowBatch* output_batch) { // why does Thrift not generate a Clear() function? output_batch->row_tuples.clear(); output_batch->tuple_offsets.clear(); @@ -437,7 +437,7 @@ int RowBatch::serialize(TRowBatch* output_batch) { return get_batch_size(*output_batch) - output_batch->tuple_data.size() + size; } -int RowBatch::serialize(PRowBatch* output_batch) { +size_t RowBatch::serialize(PRowBatch* output_batch) { // num_rows output_batch->set_num_rows(_num_rows); // row_tuples @@ -625,15 +625,15 @@ void RowBatch::transfer_resource_ownership(RowBatch* dest) { reset(); } -int RowBatch::get_batch_size(const TRowBatch& batch) { - int result = batch.tuple_data.size(); +size_t RowBatch::get_batch_size(const TRowBatch& batch) { + size_t result = batch.tuple_data.size(); result += batch.row_tuples.size() * sizeof(TTupleId); result += batch.tuple_offsets.size() * sizeof(int32_t); return result; } -int RowBatch::get_batch_size(const PRowBatch& batch) { - int result = batch.tuple_data().size(); +size_t RowBatch::get_batch_size(const PRowBatch& batch) { + size_t result = batch.tuple_data().size(); result += batch.row_tuples().size() * sizeof(int32_t); result += batch.tuple_offsets().size() * sizeof(int32_t); return result; diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index a25c1a7..b0733e6 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -350,12 +350,12 @@ public: // This function does not reset(). // Returns the uncompressed serialized size (this will be the true size of output_batch // if tuple_data is actually uncompressed). - int serialize(TRowBatch* output_batch); - int serialize(PRowBatch* output_batch); + size_t serialize(TRowBatch* output_batch); + size_t serialize(PRowBatch* output_batch); // Utility function: returns total size of batch. - static int get_batch_size(const TRowBatch& batch); - static int get_batch_size(const PRowBatch& batch); + static size_t get_batch_size(const TRowBatch& batch); + static size_t get_batch_size(const PRowBatch& batch); int num_rows() const { return _num_rows; } int capacity() const { return _capacity; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index c719409..c182377 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -409,7 +409,8 @@ public class Column implements Writable { } // now we support convert decimal to varchar type - if (getDataType() == PrimitiveType.DECIMALV2 && other.getDataType() == PrimitiveType.VARCHAR) { + if (getDataType() == PrimitiveType.DECIMALV2 && (other.getDataType() == PrimitiveType.VARCHAR + || other.getDataType() == PrimitiveType.STRING)) { return; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org