morningman commented on a change in pull request #3034: implements intersect 
node
URL: https://github.com/apache/incubator-doris/pull/3034#discussion_r387657215
 
 

 ##########
 File path: be/src/exec/intersect_node.cpp
 ##########
 @@ -56,4 +43,177 @@ Status IntersectNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status IntersectNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, 
child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    // pre-computeselect k4 from t where k4 <4 the tuple index of build tuples 
in the output row
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = 
child(1)->row_desc().tuple_descriptors()[i];
+        
_build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+
+Status IntersectNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, 
_build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+
+    return ExecNode::close(state);
+}
+
+Status IntersectNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // build hash table
+        HashTable* temp_tbl =
+                new HashTable(_child_expr_lists[0], _child_expr_lists[i], 
_build_tuple_size, false,
+                              _find_nulls, id(), mem_tracker(), 1024);
+        if (i == 1) {
+            _hash_tbl.reset(temp_tbl);
+            RowBatch build_batch(child(0)->row_desc(), state->batch_size(), 
mem_tracker());
+            RETURN_IF_ERROR(child(0)->open(state));
+
+            while (true) {
+                RETURN_IF_CANCELLED(state);
+                bool eos = true;
+                RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+                // take ownership of tuple data of build_batch
+                _build_pool->acquire_data(build_batch.tuple_data_pool(), 
false);
+                RETURN_IF_LIMIT_EXCEEDED(state, " Intersect, while 
constructing the hash table.");
+                for (int i = 0; i < build_batch.num_rows(); ++i) {
+                    _hash_tbl->insert(build_batch.get_row(i));
+                }
+                VLOG_ROW << "hash table content: "
+                         << _hash_tbl->debug_string(true, 
&child(0)->row_desc());
+                build_batch.reset();
+
+                if (eos) {
+                    break;
+                }
+            }
+        } else {
+            _hash_tbl_iterator = _hash_tbl->begin();
+            while (_hash_tbl_iterator.has_next()) {
+                if (_hash_tbl_iterator.matched()) {
+                    temp_tbl->insert(_hash_tbl_iterator.get_row());
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl->close();
+            _hash_tbl.reset(temp_tbl);
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), 
state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        while (true) {
+            RETURN_IF_CANCELLED(state);
+            bool eos = true;
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), 
&eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while constructing 
the hash table.");
 
 Review comment:
   ```suggestion
               RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the 
hash table.");
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to