GitHub user hutiefang76 closed a discussion: 界面flink sql中使用udf函数的案例有没有

我目前想要吧一些简单的flink迁移到strampark中。

```
CREATE TABLE  requestBattCoordinateDataEvt_with_outlier (
             thead  ROW(battBoxCode String,cipherFlag int,version 
String,`timeStamp` BIGINT,deviceCode String,deviceType int),
             tdataBody ROW(lng BIGINT,lat BIGINT,battBoxPos int,vin 
String,vehicleType int,sn int,battBoxStatus int,accuMilesOfVehicle int), 
           dataSign String,
           procTime AS PROCTIME(),
           `timeStamp`  AS TO_TIMESTAMP(FROM_UNIXTIME(`thead`.`timeStamp`/1000 
,'yyyy-MM-dd HH:mm:ss'))
           )

            WITH (
             'connector' = 'kafka',
             'topic' = 'requestBattCoordinateDataEvt',
           'properties.bootstrap.servers' = ' xxx:xxx ' ,
           'properties.group.id' = 'xxxxxxxx',
             'scan.startup.mode' = 'latest-offset',
             'format' = 'json',
           'json.fail-on-missing-field' = 'false',
           'json.ignore-parse-errors' = 'true' 
           );

CREATE TABLE BattRealtimeInfo (
                 bms_code STRING,
                 batt_code STRING,
                 `create_time` bigint,
                 coordinate_date bigInt,
                 receive_time_part2 bigInt,
                 lng BIGINT,
                 lat BIGINT,


                 lngD double ,
                 latD  double ,
                accuMilesOfVehicle BIGINT,


                 PRIMARY KEY (batt_code) NOT ENFORCED 
                ) WITH (
                 'connector' = 'elasticsearch-7',
                 'hosts' = 'xxxxx:9200',
                 'index' = 'dc_batt_realtime_info_v2',
                 'sink.bulk-flush.max-actions' = '200',
                'failure-handler'='ignore',
                'sink.bulk-flush.interval'='10000'
                );
                
                
                
                
           insert into BattRealtimeInfo
                SELECT
                        b.thead.battBoxCode,
                        assetCode(b.thead.battBoxCode) as device_code,
                        case when b.thead.`timeStamp` is not null then 
cast(FROM_UNIXTIME(cast(b.thead.`timeStamp`/1000 as bigint),'yyyyMMddHHmmss') 
as bigint) else cast(DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHHmmss') as bigint) 
end as `create_time`,
                       b.thead.`timeStamp`/1000,
                        cast(DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHHmmss') as 
bigint) as receive_time_part2,
                        b.tdataBody.lng,
                        b.tdataBody.lat,
                        geo_long_2_double(b.tdataBody.lng),
                        geo_long_2_double(b.tdataBody.lat),
                        b.tdataBody.accuMilesOfVehicle
                        FROM requestBattCoordinateDataEvt_with_outlier b
                        where   b.tdataBody is not null
                        and b.thead.battBoxCode is not null

```


代码如上所示。
insert的时候需要调用udf函数。
我不知道如何调用或者引入。
1我尝试依赖那上传jar也解决不了
2我在官网没有找到相关的教程
3我尝试百度和google寻找相关案例,也没找到。

麻烦大佬
1告知是否能够使用udf函数
2如果能,有没有案例或者文档,或者告知我方法。




<img width="835" alt="image" 
src="https://github.com/apache/incubator-streampark/assets/137664623/82487e38-621c-4909-ba85-5788217558c2";>

<img width="759" alt="image" 
src="https://github.com/apache/incubator-streampark/assets/137664623/35d97c10-0744-425c-8e15-44fe85c1a431";>


GitHub link: https://github.com/apache/incubator-streampark/discussions/2871

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to