This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d02f70a0f [remote-udaf](python-samples) use python to impl remote avg 
and sum s… (#11655)
5d02f70a0f is described below

commit 5d02f70a0fb994322cb69df10134706f6b0ff77b
Author: chenlinzhong <490103...@qq.com>
AuthorDate: Wed Aug 10 22:13:37 2022 +0800

    [remote-udaf](python-samples) use python to impl remote avg and sum s… 
(#11655)
---
 .../doris-demo/remote-udaf-python-demo/README.md   |  29 ++++
 .../function_server_demo.py                        | 158 +++++++++++++++++++++
 .../proto/function_service.proto                   |   1 +
 .../remote-udaf-python-demo/proto/types.proto      |   1 +
 4 files changed, 189 insertions(+)

diff --git a/samples/doris-demo/remote-udaf-python-demo/README.md 
b/samples/doris-demo/remote-udaf-python-demo/README.md
new file mode 100644
index 0000000000..b73e6d6a0d
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/README.md
@@ -0,0 +1,29 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Remote UDF Function Service In Python Demo
+
+## Compile 
+1. `pip install grpcio-tools`
+2. `python -m grpc_tools.protoc -Iproto --python_out=. --grpc_python_out=. 
proto/function_service.proto &&  python -m grpc_tools.protoc -Iproto 
--python_out=. proto/types.proto`
+
+# Run
+
+`python function_server_demo.py 9000`
+`9000` is the port that the server will listen on
\ No newline at end of file
diff --git a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py 
b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
new file mode 100644
index 0000000000..5fe5e30242
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from concurrent import futures
+
+import grpc
+
+import function_service_pb2
+import function_service_pb2_grpc
+import types_pb2
+import  sys
+import time;
+
+
+class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
+    def fn_call(self, request, context):
+        response = function_service_pb2.PFunctionCallResponse()
+        status = types_pb2.PStatus()
+        status.status_code = 0
+        response.status.CopyFrom(status)
+
+        if request.function_name == "rpc_sum_update":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.INT64
+            result.type.CopyFrom(result_type)
+            total=0
+            size= len(request.args[0].int64_value)
+            for i in range(size):
+                total += request.args[0].int64_value[i]
+    
+            if request.HasField("context"):
+                total += 
request.context.function_context.args_data[0].int64_value[0]
+            result.int64_value.append(total)
+            response.result.append(result)
+
+        if request.function_name == "rpc_sum_merge":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.INT64
+            result.type.CopyFrom(result_type)
+            args_len = len(request.args)
+            total = 0
+            for i in range(args_len):
+                total += request.args[i].int64_value[0]
+            result.int64_value.append(total)
+            response.result.append(result)
+        
+        if request.function_name == "rpc_sum_finalize":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.INT64
+            result.type.CopyFrom(result_type)
+            total = 
request.context.function_context.args_data[0].int64_value[0]
+            result.int64_value.append(total)
+            response.result.append(result)
+
+        if request.function_name == "rpc_avg_update":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.DOUBLE
+            result.type.CopyFrom(result_type)
+            total = 0
+            size = len(request.args[0].int32_value)
+            for i in range(size):
+                total += request.args[0].int32_value[i]
+
+            if request.HasField("context"):
+                total += 
request.context.function_context.args_data[0].double_value[0]
+                size += 
request.context.function_context.args_data[0].int32_value[0]
+
+            result.double_value.append(total)
+            result.int32_value.append(size)
+            response.result.append(result)
+
+        if request.function_name == "rpc_avg_merge":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.DOUBLE
+            result.type.CopyFrom(result_type)
+            total = 0
+            size = 0
+            args_len = len(request.args)
+            for i in range(args_len):
+                total += request.args[i].double_value[0]
+                size += request.args[i].int32_value[0]
+            result.add_double.append(total)
+            result.add_int32.append(size)
+            response.result.append(result)
+
+        if request.function_name == "rpc_avg_finalize":
+            result = types_pb2.PValues()
+            result.has_null = False
+            result_type = types_pb2.PGenericType()
+            result_type.id = types_pb2.PGenericType.DOUBLE
+            result.type.CopyFrom(result_type)
+            total =  
request.context.function_context.args_data[0].double_value[0]
+            size =  
request.context.function_context.args_data[0].int32_value[0]
+            avg = total / size
+            result.double_value.append(avg)
+            response.result.append(result)
+        return response
+
+    def check_fn(self, request, context):
+        response = function_service_pb2.PCheckFunctionResponse()
+        status = types_pb2.PStatus()
+        status.status_code = 0
+        response.status.CopyFrom(status)
+        return response
+
+    def hand_shake(self, request, context):
+        response = types_pb2.PHandShakeResponse()
+        if request.HasField("hello"):
+            response.hello = request.hello
+        status = types_pb2.Pstatus()
+        status.status_code = 0
+        response.status.CopyFrom(status)
+        return response
+
+
+def serve(port):
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    
function_service_pb2_grpc.add_PFunctionServiceServicer_to_server(FunctionServerDemo(),
 server)
+    server.add_insecure_port("0.0.0.0:%s" % port)
+    server.start()
+    while True:
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    logging.basicConfig()
+    port = 9000
+    if len(sys.argv) > 1:
+        port = sys.argv[1]
+    serve(port)
diff --git 
a/samples/doris-demo/remote-udaf-python-demo/proto/function_service.proto 
b/samples/doris-demo/remote-udaf-python-demo/proto/function_service.proto
new file mode 120000
index 0000000000..103b0d49a2
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/proto/function_service.proto
@@ -0,0 +1 @@
+../../../../gensrc/proto/function_service.proto
\ No newline at end of file
diff --git a/samples/doris-demo/remote-udaf-python-demo/proto/types.proto 
b/samples/doris-demo/remote-udaf-python-demo/proto/types.proto
new file mode 120000
index 0000000000..95465b6364
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/proto/types.proto
@@ -0,0 +1 @@
+../../../../gensrc/proto/types.proto
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to