This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8a3ee91bb5 [remote-udaf](sample) add some python demo (#11760) 8a3ee91bb5 is described below commit 8a3ee91bb5b7c207ce17a157d296ce8e33a7ab21 Author: chenlinzhong <490103...@qq.com> AuthorDate: Tue Aug 16 09:26:36 2022 +0800 [remote-udaf](sample) add some python demo (#11760) --- .../doris-demo/remote-udaf-python-demo/README.md | 129 ++++++++++++++++++++- .../function_server_demo.py | 73 +++++++++++- 2 files changed, 198 insertions(+), 4 deletions(-) diff --git a/samples/doris-demo/remote-udaf-python-demo/README.md b/samples/doris-demo/remote-udaf-python-demo/README.md index b73e6d6a0d..8f59b03ba6 100644 --- a/samples/doris-demo/remote-udaf-python-demo/README.md +++ b/samples/doris-demo/remote-udaf-python-demo/README.md @@ -26,4 +26,131 @@ under the License. # Run `python function_server_demo.py 9000` -`9000` is the port that the server will listen on \ No newline at end of file +`9000` is the port that the server will listen on + +# Demo + + +``` +//create one table such as table2 +CREATE TABLE `table2` ( + `event_day` date NULL, + `siteid` int(11) NULL DEFAULT "10", + `citycode` smallint(6) NULL, + `visitinfo` varchar(1024) NULL DEFAULT "", + `pv` varchar(1024) REPLACE NULL DEFAULT "0" +) ENGINE=OLAP +AGGREGATE KEY(`event_day`, `siteid`, `citycode`, `visitinfo`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`siteid`) BUCKETS 10 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"in_memory" = "false", +"storage_format" = "V2" +) +//import some data +MySQL [test_db]> select * from table2; ++------------+--------+----------+------------------------------------+------+ +| event_day | siteid | citycode | visitinfo | pv | ++------------+--------+----------+------------------------------------+------+ +| 2017-07-03 | 8 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 | +| 2017-07-03 | 37 | 12 | {"ip":"192.168.0.3","source":"pc"} | 81 | +| 2017-07-03 | 67 | 16 | {"ip":"192.168.0.2","source":"pc"} | 79 | +| 2017-07-03 | 101 | 11 | {"ip":"192.168.0.5","source":"pc"} | 65 | +| 2017-07-03 | 32 | 15 | {"ip":"192.168.0.1","source":"pc"} | 188 | +| 2017-07-03 | 103 | 12 | {"ip":"192.168.0.5","source":"pc"} | 123 | +| 2017-07-03 | 104 | 16 | {"ip":"192.168.0.5","source":"pc"} | 79 | +| 2017-07-03 | 3 | 12 | {"ip":"192.168.0.3","source":"pc"} | 123 | +| 2017-07-03 | 3 | 15 | {"ip":"192.168.0.2","source":"pc"} | 188 | +| 2017-07-03 | 13 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 | +| 2017-07-03 | 53 | 12 | {"ip":"192.168.0.2","source":"pc"} | 123 | +| 2017-07-03 | 1 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 | +| 2017-07-03 | 7 | 16 | {"ip":"192.168.0.4","source":"pc"} | 79 | +| 2017-07-03 | 102 | 15 | {"ip":"192.168.0.5","source":"pc"} | 188 | +| 2017-07-03 | 105 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 | ++------------+--------+----------+------------------------------------+------+ +``` + +### 1. find most visit top 3 ip +``` +MySQL [test_db]> CREATE AGGREGATE FUNCTION rpc_count_visit_info(varchar(1024)) RETURNS varchar(1024) PROPERTIES ( + "TYPE"="RPC", + "OBJECT_FILE"="127.0.0.1:9000", + "update_fn"="rpc_count_visit_info_update", + "merge_fn"="rpc_count_visit_info_merge", + "finalize_fn"="rpc_count_visit_info_finalize" +); +MySQL [test_db]> select rpc_count_visit_info(visitinfo) from table2; ++--------------------------------------------+ +| rpc_count_visit_info(`visitinfo`) | ++--------------------------------------------+ +| 192.168.0.5:6 192.168.0.2:3 192.168.0.1:3 | ++--------------------------------------------+ +1 row in set (0.036 sec) +MySQL [test_db]> select citycode, rpc_count_visit_info(visitinfo) from table2 group by citycode; ++----------+--------------------------------------------+ +| citycode | rpc_count_visit_info(`visitinfo`) | ++----------+--------------------------------------------+ +| 15 | 192.168.0.2:1 192.168.0.1:1 192.168.0.5:1 | +| 11 | 192.168.0.1:2 192.168.0.5:1 | +| 12 | 192.168.0.5:3 192.168.0.3:2 192.168.0.2:1 | +| 16 | 192.168.0.2:1 192.168.0.4:1 192.168.0.5:1 | ++----------+--------------------------------------------+ +4 rows in set (0.050 sec) +``` +### 2. sum pv +``` +CREATE AGGREGATE FUNCTION rpc_sum(bigint) RETURNS bigint PROPERTIES ( + "TYPE"="RPC", + "OBJECT_FILE"="127.0.0.1:9700", + "update_fn"="rpc_sum_update", + "merge_fn"="rpc_sum_merge", + "finalize_fn"="rpc_sum_finalize" +); +MySQL [test_db]> select citycode, rpc_sum(pv) from table2 group by citycode; ++----------+---------------+ +| citycode | rpc_sum(`pv`) | ++----------+---------------+ +| 15 | 564 | +| 11 | 195 | +| 12 | 612 | +| 16 | 237 | ++----------+---------------+ +4 rows in set (0.067 sec) +MySQL [test_db]> select rpc_sum(pv) from table2; ++---------------+ +| rpc_sum(`pv`) | ++---------------+ +| 1608 | ++---------------+ +1 row in set (0.030 sec) +``` + +### 3. avg pv + +``` +CREATE AGGREGATE FUNCTION rpc_avg(int) RETURNS double PROPERTIES ( + "TYPE"="RPC", + "OBJECT_FILE"="127.0.0.1:9000", + "update_fn"="rpc_avg_update", + "merge_fn"="rpc_avg_merge", + "finalize_fn"="rpc_avg_finalize" +); +MySQL [test_db]> select citycode, rpc_avg(pv) from table2 group by citycode; ++----------+---------------+ +| citycode | rpc_avg(`pv`) | ++----------+---------------+ +| 15 | 188 | +| 11 | 65 | +| 12 | 102 | +| 16 | 79 | ++----------+---------------+ +4 rows in set (0.039 sec) +MySQL [test_db]> select rpc_avg(pv) from table2; ++---------------+ +| rpc_avg(`pv`) | ++---------------+ +| 107.2 | ++---------------+ +1 row in set (0.028 sec) +``` \ No newline at end of file diff --git a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py index 5fe5e30242..eb7a3a59ca 100644 --- a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py +++ b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py @@ -28,6 +28,8 @@ import function_service_pb2_grpc import types_pb2 import sys import time; +import json + class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer): @@ -36,7 +38,6 @@ class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer): status = types_pb2.PStatus() status.status_code = 0 response.status.CopyFrom(status) - if request.function_name == "rpc_sum_update": result = types_pb2.PValues() result.has_null = False @@ -107,8 +108,8 @@ class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer): for i in range(args_len): total += request.args[i].double_value[0] size += request.args[i].int32_value[0] - result.add_double.append(total) - result.add_int32.append(size) + result.double_value.append(total) + result.int32_value.append(size) response.result.append(result) if request.function_name == "rpc_avg_finalize": @@ -122,6 +123,72 @@ class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer): avg = total / size result.double_value.append(avg) response.result.append(result) + if request.function_name == "rpc_count_visit_info_update": + result = types_pb2.PValues() + result.has_null = False + result_type = types_pb2.PGenericType() + result_type.id = types_pb2.PGenericType.STRING + result.type.CopyFrom(result_type) + size = len(request.args[0].string_value) + currentMap=dict() + if request.HasField("context"): + context = request.context.function_context.args_data[0].string_value[0] + currentMap = json.loads(context) + for i in range(size): + s = request.args[0].string_value[i] + mapInfo = json.loads(s) + ip=mapInfo['ip'] + if currentMap.has_key(ip): + last_val=currentMap[ip] + last_val+=1 + currentMap[ip] = last_val + else: + currentMap[ip] = 1 + json_dict = json.dumps(currentMap) + result.string_value.append(json_dict) + response.result.append(result) + + if request.function_name == "rpc_count_visit_info_merge": + result = types_pb2.PValues() + result.has_null = False + result_type = types_pb2.PGenericType() + result_type.id = types_pb2.PGenericType.STRING + result.type.CopyFrom(result_type) + + context1 = request.args[0].string_value[0] + currentMap1 = json.loads(context1) + context2 = request.args[1].string_value[0] + currentMap2 = json.loads(context2) + for ip,num in currentMap2.items(): + if currentMap1.has_key(ip): + currentMap1[ip] = currentMap1[ip] + num + else: + currentMap1[ip] = num + json_dict = json.dumps(currentMap1) + result.string_value.append(json_dict) + response.result.append(result) + + if request.function_name == "rpc_count_visit_info_finalize": + result = types_pb2.PValues() + result.has_null = False + result_type = types_pb2.PGenericType() + result_type.id = types_pb2.PGenericType.STRING + result.type.CopyFrom(result_type) + + context = request.context.function_context.args_data[0].string_value[0] + currentMap = json.loads(context) + sortedMap=sorted(currentMap.items(), key = lambda kv:(kv[1], kv[0]),reverse=True) + resultMap=dict() + topN=3 + if len(sortedMap) < topN: + topN = len(sortedMap) + finalResult="" + print(sortedMap) + for i in range(topN): + ip=sortedMap[i][0] + finalResult +=ip +":"+str(sortedMap[i][1]) +" " + result.string_value.append(finalResult) + response.result.append(result) return response def check_fn(self, request, context): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org