This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f584ad52ca [UDF](demo) add new demo code for java udf (#19276)
f584ad52ca is described below

commit f584ad52ca07d79a8891d84bf0c7f8a2bdc5ac2f
Author: 15767714253 <37656068+15767714...@users.noreply.github.com>
AuthorDate: Sat May 6 16:17:54 2023 +0800

    [UDF](demo) add new demo code for java udf (#19276)
---
 .../ecosystem/udf/java-user-defined-function.md    | 141 +++++++++++++++++++-
 .../ecosystem/udf/java-user-defined-function.md    | 145 ++++++++++++++++++++-
 2 files changed, 272 insertions(+), 14 deletions(-)

diff --git a/docs/en/docs/ecosystem/udf/java-user-defined-function.md 
b/docs/en/docs/ecosystem/udf/java-user-defined-function.md
index 08652bcffa..4a0f2e0d5e 100644
--- a/docs/en/docs/ecosystem/udf/java-user-defined-function.md
+++ b/docs/en/docs/ecosystem/udf/java-user-defined-function.md
@@ -140,23 +140,25 @@ public class SimpleDemo  {
     }
 
     /*required*/
-    public void serialize(State state, DataOutputStream out) throws Exception {
+    public void serialize(State state, DataOutputStream out)  {
         /* serialize some data into buffer */
         try {
             out.writeInt(state.sum);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            /* Do not throw exceptions */
+            log.info(e.getMessage());
         }
     }
 
     /*required*/
-    public void deserialize(State state, DataInputStream in) throws Exception {
+    public void deserialize(State state, DataInputStream in)  {
         /* deserialize get data from buffer before you put */
         int val = 0;
         try {
             val = in.readInt();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            /* Do not throw exceptions */
+            log.info(e.getMessage());
         }
         state.sum = val;
     }
@@ -185,6 +187,131 @@ CREATE AGGREGATE FUNCTION simple_sum(INT) RETURNS INT 
PROPERTIES (
     "type"="JAVA_UDF"
 );
 ```
+
+```JAVA
+package org.apache.doris.udf.demo;
+
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.logging.Logger;
+
+/*UDAF for calculating the median*/
+public class MedianUDAF {
+    Logger log = Logger.getLogger("MedianUDAF");
+
+    // State storage
+    public static class State {
+        // Precision of the result
+        int scale = 0;
+        // Whether this is the first time to execute add() for the data under 
a certain aggregation condition of a certain tablet
+        boolean isFirst = true;
+        //Data storage
+        public StringBuilder stringBuilder;
+    }
+
+    //State initialization
+    public State create() {
+        State state = new State();
+        //Pre-initialize based on the amount of data to be aggregated for each 
aggregation condition under each tablet, for improved performance
+        state.stringBuilder = new StringBuilder(1000);
+        return state;
+    }
+
+
+    // Handle the data for each unit under each aggregation condition for each 
tablet
+    public void add(State state, Double val, int scale) {
+        try {
+            if (val != null && state.isFirst) {
+                
state.stringBuilder.append(scale).append(",").append(val).append(",");
+                state.isFirst = false;
+            } else if (val != null) {
+                state.stringBuilder.append(val).append(",");
+            }
+        } catch (Exception e) {
+            // If it is not guaranteed that there will be no exceptions, it is 
recommended to maximize the exception capture for each method, as the 
processing of java-thrown exceptions is currently not supported
+            log.info("Exception encountered while retrieving data: " + 
e.getMessage());
+        }
+    }
+
+    // Output the data after processing for aggregation
+    public void serialize(State state, DataOutputStream out) {
+        try {
+            // Only DataOutputStream is currently provided, if object 
serialization is required, consider methods such as concatenating strings, 
converting to json, serializing to byte arrays, etc.
+            // If you want to serialize the State object, you may need to 
implement the serialization interface for the inner class State yourself
+            // In the end, it will be transmitted through DataOutputStream
+            out.writeUTF(state.stringBuilder.toString());
+        } catch (Exception e) {
+            log.info("Exception encountered while serializing data:" + 
e.getMessage());
+        }
+    }
+
+    // Retrieve the data output by each data processing unit
+    public void deserialize(State state, DataInputStream in) {
+        try {
+            String string = in.readUTF();
+            state.scale = Integer.parseInt(String.valueOf(string.charAt(0)));
+            StringBuilder stringBuilder = new 
StringBuilder(string.substring(2));
+            state.stringBuilder = stringBuilder;
+        } catch (Exception e) {
+            log.info("Exception encountered while deserializing data: " + 
e.getMessage());
+        }
+    }
+
+    // Merge the processing results of data under a certain key according to 
the aggregation condition, where state1 is the initialized instance for the 
first merge of each key
+    public void merge(State state1, State state2) {
+        try {
+            state1.scale = state2.scale;
+            state1.stringBuilder.append(state2.stringBuilder.toString());
+        } catch (Exception e) {
+            log.info("Exception encountered while merging results: " + 
e.getMessage());
+        }
+    }
+
+    // Aggregate the data for each key after merging and output the final 
result
+    public Double getValue(State state) {
+        try {
+            String[] strings = state.stringBuilder.toString( ).split(",");
+            double[] doubles = new double[strings.length + 1];
+            doubles = 
Arrays.stream(strings).mapToDouble(Double::parseDouble).toArray();
+
+            Arrays.sort(doubles);
+            double n = doubles.length - 1;
+            double index = n * 0.5;
+
+            int low = (int) Math.floor(index);
+            int high = (int) Math.ceil(index);
+
+            double value = low == high ? (doubles[low] + doubles[high]) * 0.5 
: doubles[high];
+
+            BigDecimal decimal = new BigDecimal(value);
+            return decimal.setScale(state.scale, 
BigDecimal.ROUND_HALF_UP).doubleValue();
+        } catch (Exception e) {
+            log.info("Exception encountered while calculating result:" + 
e.getMessage());
+        }
+        return 0.0;
+    }
+
+    //This method is executed after each processing unit is completed
+    public void destroy(State state) {
+    }
+
+}
+
+```
+
+```sql
+CREATE AGGREGATE FUNCTION middle_quantiles(DOUBLE,INT) RETURNS DOUBLE 
PROPERTIES (
+    "file"="file:///pathTo/java-udaf.jar",
+    "symbol"="org.apache.doris.udf.demo.MiddleNumberUDAF",
+    "always_nullable"="true",
+    "type"="JAVA_UDF"
+);
+```
+
+
 * The implemented jar package can be stored at local or in a remote server and 
downloaded via http, And each BE node must be able to obtain the jar package;
 Otherwise, the error status message "Couldn't open file..." will be returned
 
@@ -207,7 +334,7 @@ Examples of Java UDF are provided in the 
`samples/doris-demo/java-udf-demo/` dir
 
 ## Instructions
 1. Complex data types (HLL, bitmap) are not supported.
-2. Currently, users are allowed to specify the maximum heap size of the JVM 
themselves. The configuration item is jvm_ max_ heap_ size.
+2. Currently, users are allowed to specify the maximum heap size of the JVM 
themselves. The configuration item is jvm_ max_ heap_ size. The configuration 
item is in the global configuration file 'be.conf' under the installation 
directory of the BE. The default value is 512M. If data aggregation is 
required, it is recommended to increase the value to improve performance and 
reduce the risk of memory overflow.
 3. The udf of char type needs to use the String type when creating a function.
 4. Due to the problem that the jvm loads classes with the same name, do not 
use multiple classes with the same name as udf implementations at the same 
time. If you want to update the udf of a class with the same name, you need to 
restart be to reload the classpath.
 
diff --git a/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md 
b/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md
index 4a5121d816..85080f4bf5 100644
--- a/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md
+++ b/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md
@@ -106,8 +106,12 @@ package org.apache.doris.udf.demo;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.logging.Logger;
 
 public class SimpleDemo  {
+
+    Logger log = Logger.getLogger("SimpleDemo");
+
     //Need an inner class to store data
     /*required*/
     public static class State {
@@ -136,23 +140,25 @@ public class SimpleDemo  {
     }
 
     /*required*/
-    public void serialize(State state, DataOutputStream out) throws Exception {
+    public void serialize(State state, DataOutputStream out)  {
         /* serialize some data into buffer */
         try {
             out.writeInt(state.sum);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            /* Do not throw exceptions */
+            log.info(e.getMessage());
         }
     }
 
     /*required*/
-    public void deserialize(State state, DataInputStream in) throws Exception {
+    public void deserialize(State state, DataInputStream in)  {
         /* deserialize get data from buffer before you put */
         int val = 0;
         try {
             val = in.readInt();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            /* Do not throw exceptions */
+            log.info(e.getMessage());
         }
         state.sum = val;
     }
@@ -181,6 +187,131 @@ CREATE AGGREGATE FUNCTION simple_sum(INT) RETURNS INT 
PROPERTIES (
     "type"="JAVA_UDF"
 );
 ```
+
+```JAVA
+package org.apache.doris.udf.demo;
+
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.logging.Logger;
+
+/*UDAF计算中位数*/
+public class MedianUDAF {
+    Logger log = Logger.getLogger("MedianUDAF");
+
+    //状态存储
+    public static class State {
+        //返回结果的精度
+        int scale = 0;
+        //是否是某一个tablet下的某个聚合条件下的数据第一次执行add方法
+        boolean isFirst = true;
+        //数据存储
+        public StringBuilder stringBuilder;
+    }
+
+    //状态初始化
+    public State create() {
+        State state = new State();
+        //根据每个tablet下的聚合条件需要聚合的数据量大小,预先初始化,增加性能
+        state.stringBuilder = new StringBuilder(1000);
+        return state;
+    }
+
+
+    //处理执行单位处理各自tablet下的各自聚合条件下的每个数据
+    public void add(State state, Double val, int scale) {
+        try {
+            if (val != null && state.isFirst) {
+                
state.stringBuilder.append(scale).append(",").append(val).append(",");
+                state.isFirst = false;
+            } else if (val != null) {
+                state.stringBuilder.append(val).append(",");
+            }
+        } catch (Exception e) {
+            //如果不能保证一定不会异常,建议每个方法都最大化捕获异常,因为目前不支持处理java抛出的异常
+            log.info("获取数据异常: " + e.getMessage());
+        }
+    }
+
+    //处理数据完需要输出等待聚合
+    public void serialize(State state, DataOutputStream out) {
+        try {
+            //目前暂时只提供DataOutputStream,如果需要序列化对象可以考虑拼接字符串,转换json,序列化成字节数组等方式
+            //如果要序列化State对象,可能需要自己将State内部类实现序列化接口
+            //最终都是要通过DataOutputStream传输
+            out.writeUTF(state.stringBuilder.toString());
+        } catch (Exception e) {
+            log.info("序列化异常: " + e.getMessage());
+        }
+    }
+
+    //获取处理数据执行单位输出的数据
+    public void deserialize(State state, DataInputStream in) {
+        try {
+            String string = in.readUTF();
+            state.scale = Integer.parseInt(String.valueOf(string.charAt(0)));
+            StringBuilder stringBuilder = new 
StringBuilder(string.substring(2));
+            state.stringBuilder = stringBuilder;
+        } catch (Exception e) {
+            log.info("反序列化异常: " + e.getMessage());
+        }
+    }
+
+    //聚合执行单位按照聚合条件合并某一个键下数据的处理结果 ,每个键第一次合并时,state1参数是初始化的实例
+    public void merge(State state1, State state2) {
+        try {
+            state1.scale = state2.scale;
+            state1.stringBuilder.append(state2.stringBuilder.toString());
+        } catch (Exception e) {
+            log.info("合并结果异常: " + e.getMessage());
+        }
+    }
+
+    //对每个键合并后的数据进行并输出最终结果
+    public Double getValue(State state) {
+        try {
+            String[] strings = state.stringBuilder.toString().split(",");
+            double[] doubles = new double[strings.length + 1];
+            doubles = 
Arrays.stream(strings).mapToDouble(Double::parseDouble).toArray();
+
+            Arrays.sort(doubles);
+            double n = doubles.length - 1;
+            double index = n * 0.5;
+
+            int low = (int) Math.floor(index);
+            int high = (int) Math.ceil(index);
+
+            double value = low == high ? (doubles[low] + doubles[high]) * 0.5 
: doubles[high];
+
+            BigDecimal decimal = new BigDecimal(value);
+            return decimal.setScale(state.scale, 
BigDecimal.ROUND_HALF_UP).doubleValue();
+        } catch (Exception e) {
+            log.info("计算异常:" + e.getMessage());
+        }
+        return 0.0;
+    }
+
+    //每个执行单位执行完都会执行
+    public void destroy(State state) {
+    }
+
+}
+
+```
+
+```sql
+CREATE AGGREGATE FUNCTION middle_quantiles(DOUBLE,INT) RETURNS DOUBLE 
PROPERTIES (
+    "file"="file:///pathTo/java-udaf.jar",
+    "symbol"="org.apache.doris.udf.demo.MiddleNumberUDAF",
+    "always_nullable"="true",
+    "type"="JAVA_UDF"
+);
+```
+
+
 * 实现的jar包可以放在本地也可以存放在远程服务端通过http下载,但必须让每个BE节点都能获取到jar包;
 否则将会返回错误状态信息"Couldn't open file ......".
 
@@ -203,7 +334,7 @@ UDF 的使用与普通的函数方式一致,唯一的区别在于,内置函
 
 ## 使用须知
 1. 不支持复杂数据类型(HLL,Bitmap)。
-2. 当前允许用户自己指定JVM最大堆大小,配置项是jvm_max_heap_size。
+2. 
当前允许用户自己指定JVM最大堆大小,配置项是jvm_max_heap_size。配置项在BE安装目录下的be.conf全局配置中,默认512M,如果需要聚合数据,建议调大一些,增加性能,减少内存溢出风险。
 3. char类型的udf在create function时需要使用String类型。
 4. 由于jvm加载同名类的问题,不要同时使用多个同名类作为udf实现,如果想更新某个同名类的udf,需要重启be重新加载classpath。
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to