GitHub user hutiefang76 closed a discussion: 界面flink sql中使用udf函数的案例有没有
我目前想要吧一些简单的flink迁移到strampark中。
```
CREATE TABLE requestBattCoordinateDataEvt_with_outlier (
thead ROW(battBoxCode String,cipherFlag int,version
String,`timeStamp` BIGINT,deviceCode String,deviceType int),
tdataBody ROW(lng BIGINT,lat BIGINT,battBoxPos int,vin
String,vehicleType int,sn int,battBoxStatus int,accuMilesOfVehicle int),
dataSign String,
procTime AS PROCTIME(),
`timeStamp` AS TO_TIMESTAMP(FROM_UNIXTIME(`thead`.`timeStamp`/1000
,'yyyy-MM-dd HH:mm:ss'))
)
WITH (
'connector' = 'kafka',
'topic' = 'requestBattCoordinateDataEvt',
'properties.bootstrap.servers' = ' xxx:xxx ' ,
'properties.group.id' = 'xxxxxxxx',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE BattRealtimeInfo (
bms_code STRING,
batt_code STRING,
`create_time` bigint,
coordinate_date bigInt,
receive_time_part2 bigInt,
lng BIGINT,
lat BIGINT,
lngD double ,
latD double ,
accuMilesOfVehicle BIGINT,
PRIMARY KEY (batt_code) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'xxxxx:9200',
'index' = 'dc_batt_realtime_info_v2',
'sink.bulk-flush.max-actions' = '200',
'failure-handler'='ignore',
'sink.bulk-flush.interval'='10000'
);
insert into BattRealtimeInfo
SELECT
b.thead.battBoxCode,
assetCode(b.thead.battBoxCode) as device_code,
case when b.thead.`timeStamp` is not null then
cast(FROM_UNIXTIME(cast(b.thead.`timeStamp`/1000 as bigint),'yyyyMMddHHmmss')
as bigint) else cast(DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHHmmss') as bigint)
end as `create_time`,
b.thead.`timeStamp`/1000,
cast(DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHHmmss') as
bigint) as receive_time_part2,
b.tdataBody.lng,
b.tdataBody.lat,
geo_long_2_double(b.tdataBody.lng),
geo_long_2_double(b.tdataBody.lat),
b.tdataBody.accuMilesOfVehicle
FROM requestBattCoordinateDataEvt_with_outlier b
where b.tdataBody is not null
and b.thead.battBoxCode is not null
```
代码如上所示。
insert的时候需要调用udf函数。
我不知道如何调用或者引入。
1我尝试依赖那上传jar也解决不了
2我在官网没有找到相关的教程
3我尝试百度和google寻找相关案例,也没找到。
麻烦大佬
1告知是否能够使用udf函数
2如果能,有没有案例或者文档,或者告知我方法。
<img width="835" alt="image"
src="https://github.com/apache/incubator-streampark/assets/137664623/82487e38-621c-4909-ba85-5788217558c2">
<img width="759" alt="image"
src="https://github.com/apache/incubator-streampark/assets/137664623/35d97c10-0744-425c-8e15-44fe85c1a431">
GitHub link: https://github.com/apache/incubator-streampark/discussions/2871
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]