This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a3ee91bb5 [remote-udaf](sample) add some python demo (#11760)
8a3ee91bb5 is described below

commit 8a3ee91bb5b7c207ce17a157d296ce8e33a7ab21
Author: chenlinzhong <490103...@qq.com>
AuthorDate: Tue Aug 16 09:26:36 2022 +0800

    [remote-udaf](sample) add some python demo (#11760)
---
 .../doris-demo/remote-udaf-python-demo/README.md   | 129 ++++++++++++++++++++-
 .../function_server_demo.py                        |  73 +++++++++++-
 2 files changed, 198 insertions(+), 4 deletions(-)

diff --git a/samples/doris-demo/remote-udaf-python-demo/README.md 
b/samples/doris-demo/remote-udaf-python-demo/README.md
index b73e6d6a0d..8f59b03ba6 100644
--- a/samples/doris-demo/remote-udaf-python-demo/README.md
+++ b/samples/doris-demo/remote-udaf-python-demo/README.md
@@ -26,4 +26,131 @@ under the License.
 # Run
 
 `python function_server_demo.py 9000`
-`9000` is the port that the server will listen on
\ No newline at end of file
+`9000` is the port that the server will listen on
+
+# Demo
+
+
+```
+//create one table such as table2
+CREATE TABLE `table2` (
+  `event_day` date NULL,
+  `siteid` int(11) NULL DEFAULT "10",
+  `citycode` smallint(6) NULL,
+  `visitinfo` varchar(1024) NULL DEFAULT "",
+  `pv` varchar(1024) REPLACE NULL DEFAULT "0"
+) ENGINE=OLAP
+AGGREGATE KEY(`event_day`, `siteid`, `citycode`, `visitinfo`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`siteid`) BUCKETS 10
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"in_memory" = "false",
+"storage_format" = "V2"
+)
+//import some data 
+MySQL [test_db]> select * from table2;
++------------+--------+----------+------------------------------------+------+
+| event_day  | siteid | citycode | visitinfo                           | pv   |
++------------+--------+----------+------------------------------------+------+
+| 2017-07-03 |      8 |       12 | {"ip":"192.168.0.5","source":"pc"} | 81   |
+| 2017-07-03 |     37 |       12 | {"ip":"192.168.0.3","source":"pc"} | 81   |
+| 2017-07-03 |     67 |       16 | {"ip":"192.168.0.2","source":"pc"} | 79   |
+| 2017-07-03 |    101 |       11 | {"ip":"192.168.0.5","source":"pc"} | 65   |
+| 2017-07-03 |     32 |       15 | {"ip":"192.168.0.1","source":"pc"} | 188  |
+| 2017-07-03 |    103 |       12 | {"ip":"192.168.0.5","source":"pc"} | 123  |
+| 2017-07-03 |    104 |       16 | {"ip":"192.168.0.5","source":"pc"} | 79   |
+| 2017-07-03 |      3 |       12 | {"ip":"192.168.0.3","source":"pc"} | 123  |
+| 2017-07-03 |      3 |       15 | {"ip":"192.168.0.2","source":"pc"} | 188  |
+| 2017-07-03 |     13 |       11 | {"ip":"192.168.0.1","source":"pc"} | 65   |
+| 2017-07-03 |     53 |       12 | {"ip":"192.168.0.2","source":"pc"} | 123  |
+| 2017-07-03 |      1 |       11 | {"ip":"192.168.0.1","source":"pc"} | 65   |
+| 2017-07-03 |      7 |       16 | {"ip":"192.168.0.4","source":"pc"} | 79   |
+| 2017-07-03 |    102 |       15 | {"ip":"192.168.0.5","source":"pc"} | 188  |
+| 2017-07-03 |    105 |       12 | {"ip":"192.168.0.5","source":"pc"} | 81   |
++------------+--------+----------+------------------------------------+------+
+```
+
+### 1. find most visit top 3 ip 
+```
+MySQL [test_db]> CREATE AGGREGATE FUNCTION  
rpc_count_visit_info(varchar(1024)) RETURNS varchar(1024) PROPERTIES (
+    "TYPE"="RPC",
+    "OBJECT_FILE"="127.0.0.1:9000",
+    "update_fn"="rpc_count_visit_info_update",
+    "merge_fn"="rpc_count_visit_info_merge",
+    "finalize_fn"="rpc_count_visit_info_finalize"
+);
+MySQL [test_db]> select rpc_count_visit_info(visitinfo) from table2;
++--------------------------------------------+
+| rpc_count_visit_info(`visitinfo`)           |
++--------------------------------------------+
+| 192.168.0.5:6 192.168.0.2:3 192.168.0.1:3  |
++--------------------------------------------+
+1 row in set (0.036 sec)
+MySQL [test_db]> select citycode, rpc_count_visit_info(visitinfo) from table2 
group by citycode;
++----------+--------------------------------------------+
+| citycode | rpc_count_visit_info(`visitinfo`)           |
++----------+--------------------------------------------+
+|       15 | 192.168.0.2:1 192.168.0.1:1 192.168.0.5:1  |
+|       11 | 192.168.0.1:2 192.168.0.5:1                |
+|       12 | 192.168.0.5:3 192.168.0.3:2 192.168.0.2:1  |
+|       16 | 192.168.0.2:1 192.168.0.4:1 192.168.0.5:1  |
++----------+--------------------------------------------+
+4 rows in set (0.050 sec)
+```
+### 2. sum pv 
+```
+CREATE AGGREGATE FUNCTION  rpc_sum(bigint) RETURNS bigint PROPERTIES (
+    "TYPE"="RPC",
+    "OBJECT_FILE"="127.0.0.1:9700",
+    "update_fn"="rpc_sum_update",
+    "merge_fn"="rpc_sum_merge",
+    "finalize_fn"="rpc_sum_finalize"
+);
+MySQL [test_db]> select citycode, rpc_sum(pv) from table2 group by citycode;
++----------+---------------+
+| citycode | rpc_sum(`pv`) |
++----------+---------------+
+|       15 |           564 |
+|       11 |           195 |
+|       12 |           612 |
+|       16 |           237 |
++----------+---------------+
+4 rows in set (0.067 sec)
+MySQL [test_db]> select rpc_sum(pv) from table2;
++---------------+
+| rpc_sum(`pv`) |
++---------------+
+|          1608 |
++---------------+
+1 row in set (0.030 sec)
+```
+
+### 3. avg pv
+
+```
+CREATE AGGREGATE FUNCTION  rpc_avg(int) RETURNS double PROPERTIES (
+    "TYPE"="RPC",
+    "OBJECT_FILE"="127.0.0.1:9000",
+    "update_fn"="rpc_avg_update",
+    "merge_fn"="rpc_avg_merge",
+    "finalize_fn"="rpc_avg_finalize"
+);
+MySQL [test_db]> select citycode, rpc_avg(pv) from table2 group by citycode;
++----------+---------------+
+| citycode | rpc_avg(`pv`) |
++----------+---------------+
+|       15 |           188 |
+|       11 |            65 |
+|       12 |           102 |
+|       16 |            79 |
++----------+---------------+
+4 rows in set (0.039 sec)
+MySQL [test_db]> select rpc_avg(pv) from table2;
++---------------+
+| rpc_avg(`pv`) |
++---------------+
+|         107.2 |
++---------------+
+1 row in set (0.028 sec)
+```
\ No newline at end of file
diff --git a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py 
b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
index 5fe5e30242..eb7a3a59ca 100644
--- a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
+++ b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
@@ -28,6 +28,8 @@ import function_service_pb2_grpc
 import types_pb2
 import  sys
 import time;
+import json
+
 
 
 class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
@@ -36,7 +38,6 @@ class 
FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
         status = types_pb2.PStatus()
         status.status_code = 0
         response.status.CopyFrom(status)
-
         if request.function_name == "rpc_sum_update":
             result = types_pb2.PValues()
             result.has_null = False
@@ -107,8 +108,8 @@ class 
FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
             for i in range(args_len):
                 total += request.args[i].double_value[0]
                 size += request.args[i].int32_value[0]
-            result.add_double.append(total)
-            result.add_int32.append(size)
+            result.double_value.append(total)
+            result.int32_value.append(size)
             response.result.append(result)
 
         if request.function_name == "rpc_avg_finalize":
@@ -122,6 +123,72 @@ class 
FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
             avg = total / size
             result.double_value.append(avg)
             response.result.append(result)
+        if request.function_name == "rpc_count_visit_info_update":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.STRING
+            result.type.CopyFrom(result_type)
+            size =  len(request.args[0].string_value)
+            currentMap=dict()
+            if request.HasField("context"):
+                context = 
request.context.function_context.args_data[0].string_value[0]
+                currentMap = json.loads(context)
+            for i in range(size):
+                s = request.args[0].string_value[i]
+                mapInfo = json.loads(s)
+                ip=mapInfo['ip']
+                if currentMap.has_key(ip):
+                    last_val=currentMap[ip]
+                    last_val+=1
+                    currentMap[ip] = last_val
+                else:
+                    currentMap[ip] = 1
+            json_dict = json.dumps(currentMap)
+            result.string_value.append(json_dict)
+            response.result.append(result)
+
+        if request.function_name == "rpc_count_visit_info_merge":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.STRING
+            result.type.CopyFrom(result_type)
+
+            context1 = request.args[0].string_value[0]
+            currentMap1 = json.loads(context1)
+            context2 = request.args[1].string_value[0]
+            currentMap2 = json.loads(context2)
+            for ip,num in  currentMap2.items():
+                if currentMap1.has_key(ip):
+                    currentMap1[ip] = currentMap1[ip] + num
+                else:
+                    currentMap1[ip] = num
+            json_dict = json.dumps(currentMap1)
+            result.string_value.append(json_dict)
+            response.result.append(result)
+
+        if request.function_name == "rpc_count_visit_info_finalize":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.STRING
+            result.type.CopyFrom(result_type)
+
+            context = 
request.context.function_context.args_data[0].string_value[0]
+            currentMap = json.loads(context)
+            sortedMap=sorted(currentMap.items(), key = lambda kv:(kv[1], 
kv[0]),reverse=True) 
+            resultMap=dict()
+            topN=3
+            if len(sortedMap) < topN:
+                topN = len(sortedMap)
+            finalResult=""
+            print(sortedMap)
+            for i in  range(topN):
+                ip=sortedMap[i][0]
+                finalResult +=ip +":"+str(sortedMap[i][1]) +" "
+            result.string_value.append(finalResult)
+            response.result.append(result)
         return response
 
     def check_fn(self, request, context):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to