This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5fc3698fa51 [Fix](pyudf) Fix error type conversion (#61729)
5fc3698fa51 is described below
commit 5fc3698fa51bc98be8b81f41dae3e4adcc6f6e63
Author: linrrarity <[email protected]>
AuthorDate: Tue Mar 31 10:33:18 2026 +0800
[Fix](pyudf) Fix error type conversion (#61729)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
1. `LargeInt` is mapped to arrow's `str` type, but no conversion is
performed in python_server which leads to an error due to the mismatch
between the final result and the expect return type(pa.str) after the
internal calculation is converted to int.
```text
Doris> CREATE FUNCTION py_largeint_inc(LARGEINT)
-> RETURNS LARGEINT
-> PROPERTIES (
-> "type"="PYTHON_UDF",
-> "symbol"="evaluate",
-> "runtime_version"="3.12.11",
-> "always_nullable"="true"
-> )
-> AS $$
-> def evaluate(x):
-> if x is None:
-> return None
-> return int(x) + 1
-> $$;
Query OK, 0 rows affected (0.024 sec)
Doris> SELECT py_largeint_inc(1);
ERROR 1105 (HY000): errCode = 2, detailMessage =
(127.0.0.1)[RUNTIME_ERROR]Expected bytes, got a 'int' object. Detail: Python
exception: Traceback (most recent call last):
File "pyarrow/_flight.pyx", line 2315, in pyarrow._flight._do_exchange
File
"/mnt/disk7/linzhenqi/d1/doris/output/be/plugins/python_udf/python_server.py",
line 2481, in do_exchange
self._handle_exchange_udf(python_udf_meta, reader, writer)
File
"/mnt/disk7/linzhenqi/d1/doris/output/be/plugins/python_udf/python_server.py",
line 2013, in _handle_exc
```
now:
```text
Doris> select py_largeint_inc(1);
+--------------------+
| py_largeint_inc(1) |
+--------------------+
| 2 |
+--------------------+
```
2. Udaf does not maintain metadata for special types
(IPv4/IPv6/LargeInt), causing the input_type to be passed and not
processed normally.
```text
Doris> CREATE AGGREGATE FUNCTION udaf_count_loopback_ipv6_inline(IPV6)
-> RETURNS BIGINT
-> PROPERTIES (
-> "type" = "PYTHON_UDF",
-> "symbol" = "CountLoopbackIPv6UDAF",
-> "runtime_version" = "3.9.18"
-> )
-> AS $$
-> class CountLoopbackIPv6UDAF:
-> def __init__(self):
-> self.count = 0
->
-> def accumulate(self, value):
-> if value is not None and value.is_loopback:
-> self.count += 1
->
-> def merge(self, other_state):
-> if other_state is not None:
-> self.count += other_state
->
-> def finish(self):
-> return self.count
->
-> @property
-> def aggregate_state(self):
-> return self.count
-> $$;
Query OK, 0 rows affected (0.004 sec)
Doris> SELECT udaf_count_loopback_ipv6_inline(ip_v6) as loopback_ipv6_count
-> FROM test_pythonudaf_ipv6_table;
ERROR 1105 (HY000): errCode = 2, detailMessage =
(127.0.0.1)[INTERNAL_ERROR][INTERNAL_ERROR]ACCUMULATE operation failed for
place_id=140343167410176
0# doris::PythonUDAFClient::accumulate(long, bool,
arrow::RecordBatch const&, long, long) at ../src/common/status.h:500
1# doris::AggregatePythonUDAFData::add(long, doris::IColumn
const**, long, long, std::vector<std::shared_ptr<doris::IDataType const>,
std::allocator<std::shared_ptr<doris::IDataType const> > > const&) at
./be/build_RELEASE/../src/exprs/aggregate/aggregate_f
```
now
```text
Doris> SELECT udaf_count_loopback_ipv6_inline(ip_v6) as loopback_ipv6_count
-> FROM test_pythonudaf_ipv6_table;
+---------------------+
| loopback_ipv6_count |
+---------------------+
| 1 |
+---------------------+
```
---
.../aggregate/aggregate_function_python_udaf.cpp | 4 +-
be/src/format/arrow/arrow_row_batch.cpp | 5 +-
be/src/format/arrow/arrow_row_batch.h | 5 +
be/src/udf/python/python_server.py | 45 +++---
.../data_type_serde/data_type_serde_arrow_test.cpp | 22 ++-
.../data/pythonudaf_p0/test_pythonudaf_inline.out | 26 ++++
.../pythonudf_p0/test_pythonudf_inline_scalar.out | 9 ++
.../pythonudtf_p0/test_pythonudtf_basic_inline.out | 4 +
.../pythonudaf_p0/test_pythonudaf_inline.groovy | 167 ++++++++++++++++++++-
.../test_pythonudf_error_handling.groovy | 63 ++++++++
.../test_pythonudf_inline_scalar.groovy | 25 +++
.../test_pythonudtf_basic_inline.groovy | 30 ++++
12 files changed, 370 insertions(+), 35 deletions(-)
diff --git a/be/src/exprs/aggregate/aggregate_function_python_udaf.cpp
b/be/src/exprs/aggregate/aggregate_function_python_udaf.cpp
index 2765eb47c7d..4b6917f3962 100644
--- a/be/src/exprs/aggregate/aggregate_function_python_udaf.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_python_udaf.cpp
@@ -252,7 +252,9 @@ void AggregatePythonUDAF::create(AggregateDataPtr
__restrict place) const {
"Failed to convert argument type {} to
Arrow type: {}", i,
st.to_string());
}
- fields.push_back(arrow::field(std::to_string(i), arrow_type));
+ fields.push_back(create_arrow_field_with_metadata(
+ std::to_string(i), arrow_type,
argument_types[i]->is_nullable(),
+ argument_types[i]->get_primitive_type()));
}
// Add places column for GROUP BY aggregation (always included, NULL
in single-place mode)
diff --git a/be/src/format/arrow/arrow_row_batch.cpp
b/be/src/format/arrow/arrow_row_batch.cpp
index dcb40ec5d87..9c8e94e10c4 100644
--- a/be/src/format/arrow/arrow_row_batch.cpp
+++ b/be/src/format/arrow/arrow_row_batch.cpp
@@ -172,7 +172,7 @@ Status convert_to_arrow_type(const DataTypePtr& origin_type,
}
// Helper function to create an Arrow Field with type metadata if applicable,
such as IP types
-static std::shared_ptr<arrow::Field> create_arrow_field_with_metadata(
+std::shared_ptr<arrow::Field> create_arrow_field_with_metadata(
const std::string& field_name, const std::shared_ptr<arrow::DataType>&
arrow_type,
bool is_nullable, PrimitiveType primitive_type) {
if (primitive_type == PrimitiveType::TYPE_IPV4) {
@@ -181,6 +181,9 @@ static std::shared_ptr<arrow::Field>
create_arrow_field_with_metadata(
} else if (primitive_type == PrimitiveType::TYPE_IPV6) {
auto metadata = arrow::KeyValueMetadata::Make({"doris_type"},
{"IPV6"});
return std::make_shared<arrow::Field>(field_name, arrow_type,
is_nullable, metadata);
+ } else if (primitive_type == PrimitiveType::TYPE_LARGEINT) {
+ auto metadata = arrow::KeyValueMetadata::Make({"doris_type"},
{"LARGEINT"});
+ return std::make_shared<arrow::Field>(field_name, arrow_type,
is_nullable, metadata);
} else {
return std::make_shared<arrow::Field>(field_name, arrow_type,
is_nullable);
}
diff --git a/be/src/format/arrow/arrow_row_batch.h
b/be/src/format/arrow/arrow_row_batch.h
index 3c572e18aa7..e0a37f6bf42 100644
--- a/be/src/format/arrow/arrow_row_batch.h
+++ b/be/src/format/arrow/arrow_row_batch.h
@@ -31,6 +31,7 @@
namespace arrow {
class DataType;
+class Field;
class RecordBatch;
class Schema;
@@ -45,6 +46,10 @@ class RowDescriptor;
Status convert_to_arrow_type(const DataTypePtr& type,
std::shared_ptr<arrow::DataType>* result,
const std::string& timezone);
+std::shared_ptr<arrow::Field> create_arrow_field_with_metadata(
+ const std::string& field_name, const std::shared_ptr<arrow::DataType>&
arrow_type,
+ bool is_nullable, PrimitiveType primitive_type);
+
Status get_arrow_schema_from_block(const Block& block,
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone);
diff --git a/be/src/udf/python/python_server.py
b/be/src/udf/python/python_server.py
index d6d60cc4c99..b259a80d89f 100644
--- a/be/src/udf/python/python_server.py
+++ b/be/src/udf/python/python_server.py
@@ -288,6 +288,18 @@ def convert_arrow_field_to_python(field,
column_metadata=None):
)
return value
return None
+ elif doris_type in (b'LARGEINT', 'LARGEINT'):
+ if pa.types.is_string(field.type) or
pa.types.is_large_string(field.type):
+ value = field.as_py()
+ if value is not None:
+ try:
+ return int(value)
+ except (ValueError, TypeError) as e:
+ logging.warning(
+ "Failed to convert string '%s' to int for
LARGEINT: %s", value, e
+ )
+ return value
+ return None
return field.as_py()
@@ -314,16 +326,9 @@ def convert_python_to_arrow_value(value, output_type=None):
if value is None:
return None
- is_ipv4_output = False
- is_ipv6_output = False
-
- if output_type is not None and hasattr(output_type, 'metadata') and
output_type.metadata:
- # Arrow metadata keys can be either bytes or str depending on how they
were created
- doris_type = output_type.metadata.get(b'doris_type') or
output_type.metadata.get('doris_type')
- if doris_type in (b'IPV4', 'IPV4'):
- is_ipv4_output = True
- elif doris_type in (b'IPV6', 'IPV6'):
- is_ipv6_output = True
+ if output_type and pa.types.is_string(output_type) and isinstance(value,
int):
+ # If output type is string but value is int, convert to string (for
LARGEINT)
+ return str(value)
# Convert IPv4Address back to int
if isinstance(value, ipaddress.IPv4Address):
@@ -333,20 +338,6 @@ def convert_python_to_arrow_value(value, output_type=None):
if isinstance(value, ipaddress.IPv6Address):
return str(value)
- # IPv4 output must return IPv4Address objects
- if is_ipv4_output and isinstance(value, int):
- raise TypeError(
- f"IPv4 UDF must return ipaddress.IPv4Address object, got int
({value}). "
- f"Use: return ipaddress.IPv4Address({value})"
- )
-
- # IPv6 output must return IPv6Address objects
- if is_ipv6_output and isinstance(value, str):
- raise TypeError(
- f"IPv6 UDF must return ipaddress.IPv6Address object, got str
('{value}'). "
- f"Use: return ipaddress.IPv6Address('{value}')"
- )
-
# Handle list of values (but not tuples that might be struct data)
if isinstance(value, list):
# For list types, recursively convert elements
@@ -355,7 +346,8 @@ def convert_python_to_arrow_value(value, output_type=None):
return [convert_python_to_arrow_value(v, element_type) for v in
value]
else:
# No type info, just recurse without type
- return [convert_python_to_arrow_value(v, None) for v in value]
+ # Keep output_type here because UDTF row outputs are nested Python
lists whose elements still need the outer element type.
+ return [convert_python_to_arrow_value(v, output_type) for v in
value]
# Handle tuple values (could be struct data)
if isinstance(value, tuple):
@@ -2183,6 +2175,9 @@ class FlightServer(flight.FlightServerBase):
rows_processed =
result_batch_accumulate.column(0)[0].as_py()
result_batch = self._create_unified_response(
success=(rows_processed > 0),
+ # Processing zero rows is valid for empty
fragments/slices.
+ # Only exceptions should mark ACCUMULATE as failed.
+ # success=True,
rows_processed=rows_processed,
data=b"",
)
diff --git a/be/test/core/data_type_serde/data_type_serde_arrow_test.cpp
b/be/test/core/data_type_serde/data_type_serde_arrow_test.cpp
index 8cbcd290f40..602eb9f6975 100644
--- a/be/test/core/data_type_serde/data_type_serde_arrow_test.cpp
+++ b/be/test/core/data_type_serde/data_type_serde_arrow_test.cpp
@@ -392,6 +392,16 @@ std::shared_ptr<Block>
create_test_block(std::vector<PrimitiveType> cols, int ro
ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type,
col_name);
block->insert(std::move(type_and_name));
} break;
+ case TYPE_LARGEINT: {
+ auto vec = ColumnInt128::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(__int128_t(i));
+ }
+ DataTypePtr data_type(std::make_shared<DataTypeInt128>());
+ ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type,
col_name);
+ block->insert(std::move(type_and_name));
+ } break;
default:
LOG(FATAL) << "error column type";
}
@@ -430,9 +440,9 @@ void block_converter_test(std::vector<PrimitiveType> cols,
int row_num, bool is_
TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) {
std::vector<PrimitiveType> cols = {
- TYPE_INT, TYPE_INT, TYPE_STRING, TYPE_DECIMAL128I,
TYPE_BOOLEAN,
- TYPE_DECIMAL32, TYPE_DECIMAL64, TYPE_IPV4, TYPE_IPV6,
TYPE_DATETIME,
- TYPE_DATETIMEV2, TYPE_DATE, TYPE_DATEV2,
+ TYPE_INT, TYPE_INT, TYPE_STRING, TYPE_DECIMAL128I,
TYPE_BOOLEAN,
+ TYPE_DECIMAL32, TYPE_DECIMAL64, TYPE_IPV4, TYPE_IPV6,
TYPE_LARGEINT,
+ TYPE_DATETIME, TYPE_DATETIMEV2, TYPE_DATE, TYPE_DATEV2,
};
serialize_and_deserialize_arrow_test(cols, 7, true);
serialize_and_deserialize_arrow_test(cols, 7, false);
@@ -511,9 +521,9 @@ TEST(DataTypeSerDeArrowTest, BigStringSerDeTest) {
TEST(DataTypeSerDeArrowTest, BlockConverterTest) {
std::vector<PrimitiveType> cols = {
- TYPE_INT, TYPE_INT, TYPE_STRING, TYPE_DECIMAL128I,
TYPE_BOOLEAN,
- TYPE_DECIMAL32, TYPE_DECIMAL64, TYPE_IPV4, TYPE_IPV6,
TYPE_DATETIME,
- TYPE_DATETIMEV2, TYPE_DATE, TYPE_DATEV2,
+ TYPE_INT, TYPE_INT, TYPE_STRING, TYPE_DECIMAL128I,
TYPE_BOOLEAN,
+ TYPE_DECIMAL32, TYPE_DECIMAL64, TYPE_IPV4, TYPE_IPV6,
TYPE_LARGEINT,
+ TYPE_DATETIME, TYPE_DATETIMEV2, TYPE_DATE, TYPE_DATEV2,
};
block_converter_test(cols, 7, true);
block_converter_test(cols, 7, false);
diff --git a/regression-test/data/pythonudaf_p0/test_pythonudaf_inline.out
b/regression-test/data/pythonudaf_p0/test_pythonudaf_inline.out
index 5fff44d1515..bb009489686 100644
--- a/regression-test/data/pythonudaf_p0/test_pythonudaf_inline.out
+++ b/regression-test/data/pythonudaf_p0/test_pythonudaf_inline.out
@@ -76,3 +76,29 @@ A 30 30
B 70 70
C 50 50
+-- !test_empty_parallel --
+0
+
+-- !test_global_empty_parallel --
+0
+
+-- !test_largeint1 --
+1000
+
+-- !test_largeint2 --
+A 300
+B 700
+
+-- !test_ipv4_udaf1 --
+3
+
+-- !test_ipv4_udaf2 --
+A 2
+B 1
+
+-- !test_ipv6_udaf1 --
+1
+
+-- !test_ipv6_udaf2 --
+A 1
+B 0
diff --git a/regression-test/data/pythonudf_p0/test_pythonudf_inline_scalar.out
b/regression-test/data/pythonudf_p0/test_pythonudf_inline_scalar.out
index dad903a3f2d..9e3a41d29f4 100644
--- a/regression-test/data/pythonudf_p0/test_pythonudf_inline_scalar.out
+++ b/regression-test/data/pythonudf_p0/test_pythonudf_inline_scalar.out
@@ -23,3 +23,12 @@ false
-- !select_zero --
false
+-- !select_largeint_inc --
+101
+
+-- !select_largeint_inc_negative --
+-99999999999999999998
+
+-- !select_largeint_inc_null --
+\N
+
diff --git
a/regression-test/data/pythonudtf_p0/test_pythonudtf_basic_inline.out
b/regression-test/data/pythonudtf_p0/test_pythonudtf_basic_inline.out
index 8fff208efbb..467f69a9bba 100644
--- a/regression-test/data/pythonudtf_p0/test_pythonudtf_basic_inline.out
+++ b/regression-test/data/pythonudtf_p0/test_pythonudtf_basic_inline.out
@@ -243,3 +243,7 @@ HELLO
3 \N
4 cherry
+-- !largeint_expand --
+100
+101
+
diff --git a/regression-test/suites/pythonudaf_p0/test_pythonudaf_inline.groovy
b/regression-test/suites/pythonudaf_p0/test_pythonudaf_inline.groovy
index d6acee25b6b..6315f5c7fab 100644
--- a/regression-test/suites/pythonudaf_p0/test_pythonudaf_inline.groovy
+++ b/regression-test/suites/pythonudaf_p0/test_pythonudaf_inline.groovy
@@ -317,16 +317,179 @@ class SumUDAF:
qt_test_global2 """ SELECT category,
udaf_sum_global(value) as sum_val,
sum(value) as native_sum
- FROM test_pythonudaf_inline_table
- GROUP BY category
+ FROM test_pythonudaf_inline_table
+ GROUP BY category
ORDER BY category; """
+ // Empty input with high pipeline parallelism should still succeed.
+ qt_test_empty_parallel """ SELECT
/*+SET_VAR(parallel_pipeline_task_num=8)*/
+ udaf_sum_inline(value) as total
+ FROM test_pythonudaf_inline_table
+ WHERE id < 0; """
+ qt_test_global_empty_parallel """ SELECT
/*+SET_VAR(parallel_pipeline_task_num=8)*/
+ udaf_sum_global(value) as total
+ FROM test_pythonudaf_inline_table
+ WHERE id < 0; """
+
+ // ========================================
+ // Test 9: LARGEINT Sum UDAF (Inline)
+ // ========================================
+ sql """ DROP TABLE IF EXISTS test_pythonudaf_convert_type_table """
+ sql """
+ CREATE TABLE IF NOT EXISTS test_pythonudaf_convert_type_table (
+ `id` INT NOT NULL,
+ `val` LARGEINT,
+ `category` VARCHAR(10) NOT NULL,
+ `ip_v4` IPV4,
+ `ip_v6` IPV6
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ sql """ INSERT INTO test_pythonudaf_convert_type_table VALUES
+ (1, 100, 'A', '192.168.1.1', '2001:db8::1'),
+ (2, 200, 'A', '10.0.0.1', '::1'),
+ (3, 300, 'B', '8.8.8.8', '2001:4860:4860::8888'),
+ (4, 400, 'B', '172.16.0.1', 'fe80::1'),
+ (5, NULL, 'A', NULL, NULL);
+ """
+
+ sql """ DROP FUNCTION IF EXISTS udaf_sum_largeint_inline(LARGEINT); """
+
+ sql """
+ CREATE AGGREGATE FUNCTION udaf_sum_largeint_inline(LARGEINT)
+ RETURNS LARGEINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "SumLargeIntUDAF",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+class SumLargeIntUDAF:
+ def __init__(self):
+ self.sum = 0
+
+ def accumulate(self, value):
+ if value is not None:
+ self.sum += value
+
+ def merge(self, other_state):
+ if other_state is not None:
+ self.sum += other_state
+
+ def finish(self):
+ return self.sum
+
+ @property
+ def aggregate_state(self):
+ return self.sum
+\$\$;
+ """
+
+ qt_test_largeint1 """ SELECT udaf_sum_largeint_inline(val) as total
FROM test_pythonudaf_convert_type_table; """
+
+ qt_test_largeint2 """ SELECT category,
+ udaf_sum_largeint_inline(val) as sum_val
+ FROM test_pythonudaf_convert_type_table
+ GROUP BY category
+ ORDER BY category; """
+
+ // ========================================
+ // Test 10: IPv4 UDAF input type conversion
+ // ========================================
+ sql """ DROP FUNCTION IF EXISTS udaf_count_private_ipv4_inline(IPV4);
"""
+
+ sql """
+ CREATE AGGREGATE FUNCTION udaf_count_private_ipv4_inline(IPV4)
+ RETURNS BIGINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "CountPrivateIPv4UDAF",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+class CountPrivateIPv4UDAF:
+ def __init__(self):
+ self.count = 0
+
+ def accumulate(self, value):
+ if value is not None and value.is_private:
+ self.count += 1
+
+ def merge(self, other_state):
+ if other_state is not None:
+ self.count += other_state
+
+ def finish(self):
+ return self.count
+
+ @property
+ def aggregate_state(self):
+ return self.count
+\$\$;
+ """
+
+ qt_test_ipv4_udaf1 """ SELECT udaf_count_private_ipv4_inline(ip_v4) as
private_ipv4_count
+ FROM test_pythonudaf_convert_type_table; """
+ qt_test_ipv4_udaf2 """ SELECT category,
+ udaf_count_private_ipv4_inline(ip_v4) as
private_ipv4_count
+ FROM test_pythonudaf_convert_type_table
+ GROUP BY category
+ ORDER BY category; """
+
+ // ========================================
+ // Test 11: IPv6 UDAF input type conversion
+ // ========================================
+ sql """ DROP FUNCTION IF EXISTS udaf_count_loopback_ipv6_inline(IPV6);
"""
+
+ sql """
+ CREATE AGGREGATE FUNCTION udaf_count_loopback_ipv6_inline(IPV6)
+ RETURNS BIGINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "CountLoopbackIPv6UDAF",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+class CountLoopbackIPv6UDAF:
+ def __init__(self):
+ self.count = 0
+
+ def accumulate(self, value):
+ if value is not None and value.is_loopback:
+ self.count += 1
+
+ def merge(self, other_state):
+ if other_state is not None:
+ self.count += other_state
+
+ def finish(self):
+ return self.count
+
+ @property
+ def aggregate_state(self):
+ return self.count
+\$\$;
+ """
+
+ qt_test_ipv6_udaf1 """ SELECT udaf_count_loopback_ipv6_inline(ip_v6)
as loopback_ipv6_count
+ FROM test_pythonudaf_convert_type_table; """
+ qt_test_ipv6_udaf2 """ SELECT category,
+ udaf_count_loopback_ipv6_inline(ip_v6)
as loopback_ipv6_count
+ FROM test_pythonudaf_convert_type_table
+ GROUP BY category
+ ORDER BY category; """
+
} finally {
try_sql("DROP GLOBAL FUNCTION IF EXISTS udaf_sum_global(INT);")
try_sql("DROP FUNCTION IF EXISTS udaf_sum_inline(INT);")
try_sql("DROP FUNCTION IF EXISTS udaf_avg_inline(DOUBLE);")
try_sql("DROP FUNCTION IF EXISTS udaf_count_inline(INT);")
try_sql("DROP FUNCTION IF EXISTS udaf_max_inline(INT);")
+ try_sql("DROP FUNCTION IF EXISTS udaf_sum_largeint_inline(LARGEINT);")
+ try_sql("DROP FUNCTION IF EXISTS
udaf_count_private_ipv4_inline(IPV4);")
+ try_sql("DROP FUNCTION IF EXISTS
udaf_count_loopback_ipv6_inline(IPV6);")
try_sql("DROP TABLE IF EXISTS test_pythonudaf_inline_table")
+ try_sql("DROP TABLE IF EXISTS test_pythonudaf_convert_type_table")
}
}
diff --git
a/regression-test/suites/pythonudf_p0/test_pythonudf_error_handling.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_error_handling.groovy
index c6969e8ac4d..6974c006adb 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_error_handling.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_error_handling.groovy
@@ -178,6 +178,66 @@ def evaluate(s):
qt_select_length_normal """ SELECT py_safe_length('hello') AS result;
"""
qt_select_length_empty """ SELECT py_safe_length('') AS result; """
qt_select_length_null """ SELECT py_safe_length(NULL) AS result; """
+
+ // Test 7: Invalid inline symbol definitions that currently create
successfully
+ sql """ DROP FUNCTION IF EXISTS py_no_func_name(INT); """
+ sql """
+ CREATE FUNCTION py_no_func_name(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "module_only",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+ def module_only(): pass
+\$\$;
+ """
+ test {
+ sql "SELECT py_no_func_name(1)"
+ exception "unexpected indent"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_empty_sym(INT); """
+ sql """
+ CREATE FUNCTION py_empty_sym(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = ".evaluate",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+def evaluate(x):
+ return x
+\$\$;
+ """
+ test {
+ sql "SELECT py_empty_sym(1);"
+ exception "Function '.evaluate' not found"
+ }
+
+ // Test 8: Bad symbol should fail at execution time without crashing
BE.
+ sql """ DROP FUNCTION IF EXISTS py_bad_symbol(INT); """
+ sql """
+ CREATE FUNCTION py_bad_symbol(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "nonexistent_func",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ )
+ AS \$\$
+def evaluate(x):
+ return x
+\$\$;
+ """
+
+ test {
+ sql """ SELECT py_bad_symbol(1) AS result; """
+ exception "Function 'nonexistent_func' not found"
+ }
} finally {
try_sql("DROP FUNCTION IF EXISTS py_safe_divide(DOUBLE, DOUBLE);")
@@ -185,6 +245,9 @@ def evaluate(s):
try_sql("DROP FUNCTION IF EXISTS py_safe_int_parse(STRING);")
try_sql("DROP FUNCTION IF EXISTS py_safe_array_get(ARRAY<INT>, INT);")
try_sql("DROP FUNCTION IF EXISTS py_safe_length(STRING);")
+ try_sql("DROP FUNCTION IF EXISTS py_no_func_name(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_empty_sym(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_bad_symbol(INT);")
try_sql("DROP TABLE IF EXISTS error_handling_test_table;")
}
}
diff --git
a/regression-test/suites/pythonudf_p0/test_pythonudf_inline_scalar.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_inline_scalar.groovy
index 430bc64ec2f..5e68e75575f 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_inline_scalar.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_inline_scalar.groovy
@@ -102,10 +102,35 @@ def evaluate(num):
qt_select_negative """ SELECT py_is_positive(-5) AS result; """
qt_select_zero """ SELECT py_is_positive(0) AS result; """
+ // Test 5: LARGEINT increment (validates int128 <-> Python int
conversion)
+ sql """ DROP FUNCTION IF EXISTS py_largeint_inc(LARGEINT); """
+ sql """
+ CREATE FUNCTION py_largeint_inc(LARGEINT)
+ RETURNS LARGEINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+def evaluate(val):
+ if val is None:
+ return None
+ return val + 1
+\$\$;
+ """
+
+ qt_select_largeint_inc """ SELECT py_largeint_inc(CAST(100 AS
LARGEINT)) AS result; """
+ qt_select_largeint_inc_negative """
+ SELECT py_largeint_inc(CAST(-99999999999999999999 AS LARGEINT)) AS
result;
+ """
+ qt_select_largeint_inc_null """ SELECT py_largeint_inc(CAST(NULL AS
LARGEINT)) AS result; """
+
} finally {
try_sql("DROP FUNCTION IF EXISTS py_add(INT, INT);")
try_sql("DROP FUNCTION IF EXISTS py_concat(STRING, STRING);")
try_sql("DROP FUNCTION IF EXISTS py_square(DOUBLE);")
try_sql("DROP FUNCTION IF EXISTS py_is_positive(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_largeint_inc(LARGEINT);")
}
}
diff --git
a/regression-test/suites/pythonudtf_p0/test_pythonudtf_basic_inline.groovy
b/regression-test/suites/pythonudtf_p0/test_pythonudtf_basic_inline.groovy
index dd2266c1195..edbb4381be0 100644
--- a/regression-test/suites/pythonudtf_p0/test_pythonudtf_basic_inline.groovy
+++ b/regression-test/suites/pythonudtf_p0/test_pythonudtf_basic_inline.groovy
@@ -1201,6 +1201,35 @@ def parse_csv_udtf(csv_line):
ORDER BY order_id, item;
"""
+ // ========================================
+ // Test: LARGEINT UDTF
+ // Expand LARGEINT value into multiple rows
+ // ========================================
+ sql """ DROP FUNCTION IF EXISTS py_largeint_expand(LARGEINT); """
+ sql """
+ CREATE TABLES FUNCTION py_largeint_expand(LARGEINT)
+ RETURNS ARRAY<LARGEINT>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "largeint_expand_udtf",
+ "runtime_version" = "3.8.10"
+ )
+ AS \$\$
+def largeint_expand_udtf(val):
+ '''Expand LARGEINT into two rows: val and val+1'''
+ if val is not None:
+ yield (val,)
+ yield (val + 1,)
+\$\$;
+ """
+
+ qt_largeint_expand """
+ SELECT tmp.result
+ FROM (SELECT CAST(100 AS LARGEINT) AS val) t
+ LATERAL VIEW py_largeint_expand(val) tmp AS result
+ ORDER BY tmp.result;
+ """
+
} finally {
try_sql("DROP FUNCTION IF EXISTS py_split_string(STRING);")
try_sql("DROP FUNCTION IF EXISTS py_generate_series(INT, INT);")
@@ -1224,6 +1253,7 @@ def parse_csv_udtf(csv_line):
try_sql("DROP FUNCTION IF EXISTS py_split_words(STRING);")
try_sql("DROP FUNCTION IF EXISTS py_expand_range(INT);")
try_sql("DROP FUNCTION IF EXISTS py_parse_csv(STRING);")
+ try_sql("DROP FUNCTION IF EXISTS py_largeint_expand(LARGEINT);")
try_sql("DROP TABLE IF EXISTS temp_input;")
try_sql("DROP TABLE IF EXISTS numbers_table;")
try_sql("DROP TABLE IF EXISTS ranked_data;")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]