This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4ef46159ae [vectorized](udaf) support array type for java-udaf (#17351) 4ef46159ae is described below commit 4ef46159aeef71c83832d4f1c9aa2a7875282ac9 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Thu Mar 9 11:30:07 2023 +0800 [vectorized](udaf) support array type for java-udaf (#17351) --- be/src/util/jni-util.h | 4 +- .../aggregate_function_java_udaf.h | 102 ++++++++++++++++- be/src/vec/functions/function_java_udf.cpp | 8 +- .../java/org/apache/doris/udf/BaseExecutor.java | 6 +- .../java/org/apache/doris/udf/UdafExecutor.java | 9 +- .../java/org/apache/doris/udf/UdfExecutor.java | 2 +- .../data/javaudf_p0/test_javaudaf_mysum_array.out | 87 +++++++++++++++ .../doris/udf/ArrayReturnArrayStringTest.java | 1 - .../java/org/apache/doris/udf/ArrayStringTest.java | 1 - .../java/org/apache/doris/udf/MyArrayString.java | 86 +++++++++++++++ .../org/apache/doris/udf/MyReturnArrayString.java | 79 +++++++++++++ .../java/org/apache/doris/udf/MySumArrayInt.java | 64 +++++++++++ .../org/apache/doris/udf/MySumReturnArrayInt.java | 76 +++++++++++++ .../javaudf_p0/test_javaudaf_mysum_array.groovy | 122 +++++++++++++++++++++ 14 files changed, 632 insertions(+), 15 deletions(-) diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index 0e551f17cf..5aa8be9a1f 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -57,9 +57,9 @@ public: static jclass jni_util_class() { return jni_util_cl_; } static jmethodID throwable_to_stack_trace_id() { return throwable_to_stack_trace_id_; } - static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024; + static const int64_t INITIAL_RESERVED_BUFFER_SIZE = 1024; // TODO: we need a heuristic strategy to increase buffer size for variable-size output. - static inline int32_t IncreaseReservedBufferSize(int n) { + static inline int64_t IncreaseReservedBufferSize(int n) { return INITIAL_RESERVED_BUFFER_SIZE << n; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 6b8b1ce9a1..93ef2341cc 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -28,6 +28,7 @@ #include "runtime/user_function_cache.h" #include "util/jni-util.h" #include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column_array.h" #include "vec/columns/column_string.h" #include "vec/common/string_ref.h" #include "vec/core/field.h" @@ -55,11 +56,15 @@ public: input_values_buffer_ptr.reset(new int64_t[num_args]); input_nulls_buffer_ptr.reset(new int64_t[num_args]); input_offsets_ptrs.reset(new int64_t[num_args]); + input_array_nulls_buffer_ptr.reset(new int64_t[num_args]); + input_array_string_offsets_ptrs.reset(new int64_t[num_args]); input_place_ptrs.reset(new int64_t); output_value_buffer.reset(new int64_t); output_null_value.reset(new int64_t); output_offsets_ptr.reset(new int64_t); output_intermediate_state_ptr.reset(new int64_t); + output_array_null_ptr.reset(new int64_t); + output_array_string_offsets_ptr.reset(new int64_t); } ~AggregateJavaUdafData() { @@ -92,6 +97,11 @@ public: ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get()); ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get()); ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get()); + ctor_params.__set_input_array_nulls_buffer_ptr( + (int64_t)input_array_nulls_buffer_ptr.get()); + ctor_params.__set_input_array_string_offsets_ptrs( + (int64_t)input_array_string_offsets_ptrs.get()); + ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get()); ctor_params.__set_input_places_ptr((int64_t)input_place_ptrs.get()); @@ -99,6 +109,9 @@ public: ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get()); ctor_params.__set_output_intermediate_state_ptr( (int64_t)output_intermediate_state_ptr.get()); + ctor_params.__set_output_array_null_ptr((int64_t)output_array_null_ptr.get()); + ctor_params.__set_output_array_string_offsets_ptr( + (int64_t)output_array_string_offsets_ptr.get()); jbyteArray ctor_params_bytes; @@ -140,6 +153,30 @@ public: } else if (data_col->is_numeric() || data_col->is_column_decimal()) { input_values_buffer_ptr.get()[arg_idx] = reinterpret_cast<int64_t>(data_col->get_raw_data().data); + } else if (data_col->is_column_array()) { + const ColumnArray* array_col = assert_cast<const ColumnArray*>(data_col); + input_offsets_ptrs.get()[arg_idx] = reinterpret_cast<int64_t>( + array_col->get_offsets_column().get_raw_data().data); + const ColumnNullable& array_nested_nullable = + assert_cast<const ColumnNullable&>(array_col->get_data()); + auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); + auto data_column = array_nested_nullable.get_nested_column_ptr(); + input_array_nulls_buffer_ptr.get()[arg_idx] = reinterpret_cast<int64_t>( + check_and_get_column<ColumnVector<UInt8>>(data_column_null_map) + ->get_data() + .data()); + + //need pass FE, nullamp and offset, chars + if (data_column->is_column_string()) { + const ColumnString* col = assert_cast<const ColumnString*>(data_column.get()); + input_values_buffer_ptr.get()[arg_idx] = + reinterpret_cast<int64_t>(col->get_chars().data()); + input_array_string_offsets_ptrs.get()[arg_idx] = + reinterpret_cast<int64_t>(col->get_offsets().data()); + } else { + input_values_buffer_ptr.get()[arg_idx] = + reinterpret_cast<int64_t>(data_column->get_raw_data().data); + } } else { return Status::InvalidArgument( strings::Substitute("Java UDAF doesn't support type is $0 now !", @@ -210,7 +247,7 @@ public: ColumnString::Offsets& offsets = \ const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \ int increase_buffer_size = 0; \ - int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ chars.resize(buffer_size); \ *output_value_buffer = reinterpret_cast<int64_t>(chars.data()); \ *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data()); \ @@ -219,7 +256,7 @@ public: executor_result_id, to.size() - 1, place); \ while (res != JNI_TRUE) { \ increase_buffer_size++; \ - int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ chars.resize(buffer_size); \ *output_value_buffer = reinterpret_cast<int64_t>(chars.data()); \ *output_intermediate_state_ptr = chars.size(); \ @@ -230,6 +267,63 @@ public: *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data); \ env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \ to.size() - 1, place); \ + } else if (data_col.is_column_array()) { \ + ColumnArray& array_col = assert_cast<ColumnArray&>(data_col); \ + ColumnNullable& array_nested_nullable = \ + assert_cast<ColumnNullable&>(array_col.get_data()); \ + auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); \ + auto data_column = array_nested_nullable.get_nested_column_ptr(); \ + auto& offset_column = array_col.get_offsets_column(); \ + int increase_buffer_size = 0; \ + int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + *output_offsets_ptr = reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \ + data_column_null_map->resize(buffer_size); \ + auto& null_map_data = \ + assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data(); \ + *output_array_null_ptr = reinterpret_cast<int64_t>(null_map_data.data()); \ + *output_intermediate_state_ptr = buffer_size; \ + if (data_column->is_column_string()) { \ + ColumnString* str_col = assert_cast<ColumnString*>(data_column.get()); \ + ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \ + ColumnString::Offsets& offsets = \ + assert_cast<ColumnString::Offsets&>(str_col->get_offsets()); \ + chars.resize(buffer_size); \ + offsets.resize(buffer_size); \ + *output_value_buffer = reinterpret_cast<int64_t>(chars.data()); \ + *output_array_string_offsets_ptr = reinterpret_cast<int64_t>(offsets.data()); \ + jboolean res = env->CallNonvirtualBooleanMethod( \ + executor_obj, executor_cl, executor_result_id, to.size() - 1, place); \ + while (res != JNI_TRUE) { \ + increase_buffer_size++; \ + buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + null_map_data.resize(buffer_size); \ + chars.resize(buffer_size); \ + offsets.resize(buffer_size); \ + *output_array_null_ptr = reinterpret_cast<int64_t>(null_map_data.data()); \ + *output_value_buffer = reinterpret_cast<int64_t>(chars.data()); \ + *output_array_string_offsets_ptr = reinterpret_cast<int64_t>(offsets.data()); \ + *output_intermediate_state_ptr = buffer_size; \ + res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, \ + executor_result_id, to.size() - 1, place); \ + } \ + } else { \ + data_column->resize(buffer_size); \ + *output_value_buffer = reinterpret_cast<int64_t>(data_column->get_raw_data().data); \ + jboolean res = env->CallNonvirtualBooleanMethod( \ + executor_obj, executor_cl, executor_result_id, to.size() - 1, place); \ + while (res != JNI_TRUE) { \ + increase_buffer_size++; \ + buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + null_map_data.resize(buffer_size); \ + data_column->resize(buffer_size); \ + *output_array_null_ptr = reinterpret_cast<int64_t>(null_map_data.data()); \ + *output_value_buffer = \ + reinterpret_cast<int64_t>(data_column->get_raw_data().data); \ + *output_intermediate_state_ptr = buffer_size; \ + res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, \ + executor_result_id, to.size() - 1, place); \ + } \ + } \ } else { \ return Status::InvalidArgument(strings::Substitute( \ "Java UDAF doesn't support return type is $0 now !", result_type->get_name())); \ @@ -286,11 +380,15 @@ private: std::unique_ptr<int64_t[]> input_values_buffer_ptr; std::unique_ptr<int64_t[]> input_nulls_buffer_ptr; std::unique_ptr<int64_t[]> input_offsets_ptrs; + std::unique_ptr<int64_t[]> input_array_nulls_buffer_ptr; + std::unique_ptr<int64_t[]> input_array_string_offsets_ptrs; std::unique_ptr<int64_t> input_place_ptrs; std::unique_ptr<int64_t> output_value_buffer; std::unique_ptr<int64_t> output_null_value; std::unique_ptr<int64_t> output_offsets_ptr; std::unique_ptr<int64_t> output_intermediate_state_ptr; + std::unique_ptr<int64_t> output_array_null_ptr; + std::unique_ptr<int64_t> output_array_string_offsets_ptr; int argument_size = 0; std::string serialize_data; diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index 2c8e9a91fa..0fadae2e61 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -200,7 +200,7 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block, ColumnString::Offsets& offsets = \ const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \ int increase_buffer_size = 0; \ - int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ chars.resize(buffer_size); \ offsets.resize(num_rows); \ *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ @@ -211,7 +211,7 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block, nullptr); \ while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \ increase_buffer_size++; \ - int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ chars.resize(buffer_size); \ *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ @@ -232,7 +232,7 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block, auto data_column = array_nested_nullable.get_nested_column_ptr(); \ auto& offset_column = array_col->get_offsets_column(); \ int increase_buffer_size = 0; \ - int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ + int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ offset_column.resize(num_rows); \ *(jni_ctx->output_offsets_ptr) = \ reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \ @@ -263,6 +263,8 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block, *(jni_ctx->output_array_null_ptr) = \ reinterpret_cast<int64_t>(null_map_data.data()); \ *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \ + *(jni_ctx->output_array_string_offsets_ptr) = \ + reinterpret_cast<int64_t>(offsets.data()); \ jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \ env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, \ executor_evaluate_id_, nullptr); \ diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index 69a132315e..dde6a0b084 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -472,7 +472,7 @@ public abstract class BaseExecutor { } } - protected abstract long getCurrentOutputOffset(long row); + protected abstract long getCurrentOutputOffset(long row, boolean isArrayType); /** * Close the class loader we may have created. @@ -615,7 +615,7 @@ public abstract class BaseExecutor { case STRING: { long bufferSize = UdfUtils.UNSAFE.getLong(null, outputIntermediateStatePtr); byte[] bytes = ((String) obj).getBytes(StandardCharsets.UTF_8); - long offset = getCurrentOutputOffset(row); + long offset = getCurrentOutputOffset(row, false); if (offset + bytes.length > bufferSize) { return false; } @@ -637,7 +637,7 @@ public abstract class BaseExecutor { } public boolean arrayTypeOutputData(Object obj, Type type, long row) throws UdfRuntimeException { - long offset = getCurrentOutputOffset(row); + long offset = getCurrentOutputOffset(row, true); long bufferSize = UdfUtils.UNSAFE.getLong(null, outputIntermediateStatePtr); long outputNullMapBase = UdfUtils.UNSAFE.getLong(null, outputArrayNullPtr); long outputBufferBase = UdfUtils.UNSAFE.getLong(null, outputBufferPtr); diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java index 4f88fa967e..7ce47b4837 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java @@ -176,9 +176,14 @@ public class UdafExecutor extends BaseExecutor { } @Override - protected long getCurrentOutputOffset(long row) { - return Integer.toUnsignedLong( + protected long getCurrentOutputOffset(long row, boolean isArrayType) { + if (isArrayType) { + return Integer.toUnsignedLong( + UdfUtils.UNSAFE.getInt(null, UdfUtils.UNSAFE.getLong(null, outputOffsetsPtr) + 8L * (row - 1))); + } else { + return Integer.toUnsignedLong( UdfUtils.UNSAFE.getInt(null, UdfUtils.UNSAFE.getLong(null, outputOffsetsPtr) + 4L * (row - 1))); + } } @Override diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java index 5cc295dd8e..b888279db3 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -141,7 +141,7 @@ public class UdfExecutor extends BaseExecutor { } @Override - protected long getCurrentOutputOffset(long row) { + protected long getCurrentOutputOffset(long row, boolean isArrayType) { return outputOffset; } diff --git a/regression-test/data/javaudf_p0/test_javaudaf_mysum_array.out b/regression-test/data/javaudf_p0/test_javaudaf_mysum_array.out new file mode 100644 index 0000000000..e33f21c6a6 --- /dev/null +++ b/regression-test/data/javaudf_p0/test_javaudaf_mysum_array.out @@ -0,0 +1,87 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +1 1 abcdefg1 poiuytre1abcdefg +2 2 abcdefg2 poiuytre2abcdefg +0 3 abcdefg3 poiuytre3abcdefg +1 4 abcdefg4 poiuytre4abcdefg +2 5 abcdefg5 poiuytre5abcdefg +0 6 abcdefg6 poiuytre6abcdefg +1 7 abcdefg7 poiuytre7abcdefg +2 8 abcdefg8 poiuytre8abcdefg +9 9 abcdefg9 poiuytre9abcdefg + +-- !select1 -- +18 + +-- !select2 -- +0 0 +1 3 +2 6 +9 9 + +-- !select3 -- +0 0 0 +1 3 3 +2 6 6 +9 9 9 + +-- !select4 -- +0 0 0 +1 3 3 +2 6 6 +9 9 9 + +-- !select5 -- +36 + +-- !select6 -- +[0, 0, 1, 1, 1, 2, 2, 2, 9] + +-- !select6_1 -- +[0, 0] +[1, 1, 1] +[2, 2, 2] +[9] + +-- !select7 -- +[0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 9, 9] + +-- !select7_1 -- +[0, 0, 0, 0] +[1, 1, 1, 1, 1, 1] +[2, 2, 2, 2, 2, 2] +[9, 9] + +-- !select_8 -- +['poiuytre1abcdefg', 'poiuytre2abcdefg', 'poiuytre3abcdefg', 'poiuytre4abcdefg', 'poiuytre5abcdefg', 'poiuytre6abcdefg', 'poiuytre7abcdefg', 'poiuytre8abcdefg', 'poiuytre9abcdefg'] + +-- !select_9 -- +['poiuytre3abcdefg', 'poiuytre6abcdefg'] 0 +['poiuytre1abcdefg', 'poiuytre4abcdefg', 'poiuytre7abcdefg'] 1 +['poiuytre2abcdefg', 'poiuytre5abcdefg', 'poiuytre8abcdefg'] 2 +['poiuytre9abcdefg'] 9 + +-- !select_10 -- +['0', '0', 'poiuytre3abcdefg', 'poiuytre6abcdefg'] 0 +['1', '1', '1', 'poiuytre1abcdefg', 'poiuytre4abcdefg', 'poiuytre7abcdefg'] 1 +['2', '2', '2', 'poiuytre2abcdefg', 'poiuytre5abcdefg', 'poiuytre8abcdefg'] 2 +['9', 'poiuytre9abcdefg'] 9 + +-- !select_11 -- +[] + +-- !select_13 -- +poiuytre3abcdefg--poiuytre6abcdefg- 0 +poiuytre1abcdefg--poiuytre4abcdefg--poiuytre7abcdefg- 1 +poiuytre2abcdefg--poiuytre5abcdefg--poiuytre8abcdefg- 2 +poiuytre9abcdefg- 9 + +-- !select_14 -- +poiuytre3abcdefg-0--poiuytre6abcdefg-0- 0 +poiuytre1abcdefg-1--poiuytre4abcdefg-1--poiuytre7abcdefg-1- 1 +poiuytre2abcdefg-2--poiuytre5abcdefg-2--poiuytre8abcdefg-2- 2 +poiuytre9abcdefg-9- 9 + +-- !select_15 -- + + diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayReturnArrayStringTest.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayReturnArrayStringTest.java index 0f0d73db86..cebdc1a4d1 100644 --- a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayReturnArrayStringTest.java +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayReturnArrayStringTest.java @@ -30,7 +30,6 @@ public class ArrayReturnArrayStringTest extends UDF { for (int i = 0; i < res.size(); ++i) { String data = res.get(i); if (data != null) { - System.out.println(data); value = value + data; } } diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayStringTest.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayStringTest.java index ea860babac..953a076fa6 100644 --- a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayStringTest.java +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/ArrayStringTest.java @@ -30,7 +30,6 @@ public class ArrayStringTest extends UDF { for (int i = 0; i < res.size(); ++i) { String data = res.get(i); if (data != null) { - System.out.println(data); value = value + data; } } diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MyArrayString.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MyArrayString.java new file mode 100644 index 0000000000..311e73aab1 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MyArrayString.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.udf; +import org.apache.log4j.Logger; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; + +public class MyArrayString { + private static final Logger LOG = Logger.getLogger(MyArrayString.class); + public static class State { + public String data = new String(); + public String separator = "-"; + public boolean inited = false; + } + + public State create() { + return new State(); + } + + public void destroy(State state) { + + } + + public void add(State state, ArrayList<String> val) { + if (val == null) return; + if (state.inited) { + state.data += state.separator; + } else { + state.inited = true; + } + for (int i = 0; i < val.size(); ++i) { + String data = val.get(i); + if (data != null) { + state.data = state.data + data + state.separator; + } + } + } + + public void serialize(State state, DataOutputStream out) throws IOException { + out.writeBoolean(state.inited); + out.writeInt(state.data.length()); + out.writeBytes(state.data); + } + + public void deserialize(State state, DataInputStream in) throws IOException { + state.inited = in.readBoolean(); + int len = in.readInt(); + byte[] bytes = new byte[len]; + in.read(bytes); + state.data = new String(bytes); + } + + public void merge(State state, State rhs) { + if (!rhs.inited) { + return; + } + + if (!state.inited) { + state.inited = true; + state.data = rhs.data; + } else { + state.data += state.separator; + state.data +=rhs.data; + } + } + + public String getValue(State state) { + return state.data; + } +} \ No newline at end of file diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MyReturnArrayString.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MyReturnArrayString.java new file mode 100644 index 0000000000..bd15f9fef0 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MyReturnArrayString.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.udf; +import org.apache.log4j.Logger; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; + +public class MyReturnArrayString { + private static final Logger LOG = Logger.getLogger(MyReturnArrayString.class); + public static class State { + public ArrayList<String> data = new ArrayList<String>(); + } + + public State create() { + return new State(); + } + + public void destroy(State state) { + + } + + public void add(State state, ArrayList<String> val) { + if (val == null) return; + for (int i = 0; i < val.size(); ++i) { + String s = val.get(i); + if (s != null) { + state.data.add(s); + } + } + } + + public void serialize(State state, DataOutputStream out) throws IOException { + int size = state.data.size(); + out.writeInt(size); + for (int i = 0; i < size; ++i) { + String val = state.data.get(i); + out.writeInt(val.length()); + out.writeBytes(val); + } + } + + public void deserialize(State state, DataInputStream in) throws IOException { + int size = in.readInt(); + for (int i = 0; i < size; ++i) { + int len = in.readInt(); + byte[] bytes = new byte[len]; + in.read(bytes); + state.data.add(new String(bytes)); + } + } + + public void merge(State state, State rhs) { + state.data.addAll(rhs.data); + } + + public ArrayList<String> getValue(State state) { + //sort for regression test + state.data.sort(Comparator.naturalOrder()); + return state.data; + } +} \ No newline at end of file diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumArrayInt.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumArrayInt.java new file mode 100644 index 0000000000..5731a7f265 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumArrayInt.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.udf; +import org.apache.log4j.Logger; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; + +public class MySumArrayInt { + private static final Logger LOG = Logger.getLogger(MySumArrayInt.class); + public static class State { + public long counter = 0; + } + + public State create() { + return new State(); + } + + public void destroy(State state) { + } + + public void add(State state, ArrayList<Integer> val) { + if (val == null) { + return; + } + for (int i = 0; i < val.size(); ++i) { + Integer data = val.get(i); + if (data != null) { + state.counter = state.counter + data; + } + } + } + + public void serialize(State state, DataOutputStream out) throws IOException { + out.writeLong(state.counter); + } + + public void deserialize(State state, DataInputStream in) throws IOException { + state.counter = in.readLong(); + } + + public void merge(State state, State rhs) { + state.counter += rhs.counter; + } + + public long getValue(State state) { + return state.counter; + } +} \ No newline at end of file diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumReturnArrayInt.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumReturnArrayInt.java new file mode 100644 index 0000000000..b9ebdf89d0 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumReturnArrayInt.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.udf; +import org.apache.log4j.Logger; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; + +public class MySumReturnArrayInt { + private static final Logger LOG = Logger.getLogger(MySumReturnArrayInt.class); + public static class State { + public ArrayList<Integer> counter = new ArrayList<Integer>(); + } + + public State create() { + return new State(); + } + + public void destroy(State state) { + } + + public void add(State state, ArrayList<Integer> val) { + if (val == null) { + return; + } + for (int i = 0; i < val.size(); ++i) { + Integer data = val.get(i); + + if (data != null) { + state.counter.add(data); + } + } + } + + public void serialize(State state, DataOutputStream out) throws IOException { + int size = state.counter.size(); + out.writeInt(size); + for (int i = 0; i < size; ++i) { + out.writeInt(state.counter.get(i)); + } + } + + public void deserialize(State state, DataInputStream in) throws IOException { + int size = in.readInt(); + for (int i = 0; i < size; ++i) { + state.counter.add(in.readInt()); + } + } + + public void merge(State state, State rhs) { + state.counter.addAll(rhs.counter); + } + + public ArrayList<Integer> getValue(State state) { + //sort for regression test + state.counter.sort(Comparator.naturalOrder()); + return state.counter; + } +} \ No newline at end of file diff --git a/regression-test/suites/javaudf_p0/test_javaudaf_mysum_array.groovy b/regression-test/suites/javaudf_p0/test_javaudaf_mysum_array.groovy new file mode 100644 index 0000000000..2aaa307446 --- /dev/null +++ b/regression-test/suites/javaudf_p0/test_javaudaf_mysum_array.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_javaudaf_mysum_array") { + def tableName = "test_javaudaf_mysum_array" + def jarPath = """${context.file.parent}/jars/java-udf-case-jar-with-dependencies.jar""" + + log.info("Jar path: ${jarPath}".toString()) + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `user_id` INT NOT NULL COMMENT "用户id", + `char_col` CHAR NOT NULL COMMENT "", + `varchar_col` VARCHAR(10) NOT NULL COMMENT "", + `string_col` STRING NOT NULL COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 9; i ++) { + sb.append(""" + (${i % 3}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'), + """) + } + sb.append(""" + (${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg') + """) + sql """ INSERT INTO ${tableName} VALUES + ${sb.toString()} + """ + qt_select_default """ SELECT * FROM ${tableName} t ORDER BY char_col; """ + + File path = new File(jarPath) + if (!path.exists()) { + throw new IllegalStateException("""${jarPath} doesn't exist! """) + } + + sql """ DROP FUNCTION IF EXISTS udaf_my_sum_arrayint(array<int>); """ + sql """ CREATE AGGREGATE FUNCTION udaf_my_sum_arrayint(array<int>) RETURNS BigInt PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.MySumArrayInt", + "type"="JAVA_UDF" + ); """ + + qt_select1 """ SELECT udaf_my_sum_arrayint(array(user_id)) result FROM ${tableName}; """ + + qt_select2 """ select user_id, udaf_my_sum_arrayint(array(user_id)) from ${tableName} group by user_id order by user_id; """ + + qt_select3 """ select user_id, sum(user_id), udaf_my_sum_arrayint(array(user_id)) from ${tableName} group by user_id order by user_id; """ + + qt_select4 """ select user_id, udaf_my_sum_arrayint(array(user_id)), sum(user_id) from ${tableName} group by user_id order by user_id; """ + + qt_select5 """ SELECT udaf_my_sum_arrayint(array(user_id, user_id)) result FROM ${tableName}; """ + + + sql """ DROP FUNCTION IF EXISTS udaf_my_sum_return_arrayint(array<int>); """ + sql """ CREATE AGGREGATE FUNCTION udaf_my_sum_return_arrayint(array<int>) RETURNS array<int> PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.MySumReturnArrayInt", + "type"="JAVA_UDF" + ); """ + + qt_select6 """ SELECT udaf_my_sum_return_arrayint(array(user_id)) result FROM ${tableName}; """ + qt_select6_1 """ SELECT udaf_my_sum_return_arrayint(array(user_id)) result FROM ${tableName} group by user_id order by user_id; """ + qt_select7 """ SELECT udaf_my_sum_return_arrayint(array(user_id, user_id)) result FROM ${tableName}; """ + qt_select7_1 """ SELECT udaf_my_sum_return_arrayint(array(user_id, user_id)) result FROM ${tableName} group by user_id order by user_id; """ + + + sql """ DROP FUNCTION IF EXISTS udaf_my_sum_return_arraystring(array<string>); """ + sql """ CREATE AGGREGATE FUNCTION udaf_my_sum_return_arraystring(array<string>) RETURNS array<string> PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.MyReturnArrayString", + "type"="JAVA_UDF" + ); """ + + qt_select_8 """ SELECT udaf_my_sum_return_arraystring(array(string_col)) FROM ${tableName}; """ + qt_select_9 """ SELECT udaf_my_sum_return_arraystring(array(string_col)), user_id as result FROM ${tableName} group by result ORDER BY result; """ + qt_select_10 """ SELECT udaf_my_sum_return_arraystring(array(string_col, cast(user_id as string))), user_id as result FROM ${tableName} group by result ORDER BY result; """ + qt_select_11 """ SELECT udaf_my_sum_return_arraystring(null) result FROM ${tableName}; """ + + + sql """ DROP FUNCTION IF EXISTS udaf_my_sum_arraystring(array<string>); """ + sql """ CREATE AGGREGATE FUNCTION udaf_my_sum_arraystring(array<string>) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.MyArrayString", + "type"="JAVA_UDF" + ); """ + + qt_select_13 """ SELECT udaf_my_sum_arraystring(array(string_col)), user_id as result FROM ${tableName} group by result ORDER BY result; """ + qt_select_14 """ SELECT udaf_my_sum_arraystring(array(string_col, cast(user_id as string))), user_id as result FROM ${tableName} group by result ORDER BY result; """ + qt_select_15 """ SELECT udaf_my_sum_arraystring(null) result FROM ${tableName}; """ + + } finally { + try_sql("DROP FUNCTION IF EXISTS udaf_my_sum_arraystring(array<string>);") + try_sql("DROP FUNCTION IF EXISTS udaf_my_sum_return_arraystring(array<string>);") + try_sql("DROP FUNCTION IF EXISTS udaf_my_sum_arrayint(array<int>);") + try_sql("DROP FUNCTION IF EXISTS udaf_my_sum_return_arrayint(array<int>);") + try_sql("DROP TABLE IF EXISTS ${tableName}") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org