[
https://issues.apache.org/jira/browse/FLINK-38678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ramin Gharib updated FLINK-38678:
---------------------------------
Description:
You can use the Flink SQL CLI to test the queries:
1. Create Source table
{code:java}
CREATE TABLE datagenSource (
order_id BIGINT,
order_number VARCHAR(20),
user_id BIGINT,
shop_id BIGINT,
product_id BIGINT,
status BIGINT,
order_type BIGINT,
order_created_at TIMESTAMP(3),
payment_amount_cents BIGINT
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
); {code}
2. Create the materialized table (without freshness)
{code:java}
CREATE MATERIALIZED TABLE users_shops
PARTITIONED BY (ds)
WITH(
'connector' = 'blackhole'
)
REFRESH_MODE = CONTINUOUS
AS SELECT
user_id,
shop_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS pv
FROM (
SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
payment_amount_cents FROM datagenSource
) AS tmp
GROUP BY user_id, shop_id, ds; {code}
3. run
{noformat}
SHOW CREATE MATERIALIZED TABLE users_shops;{noformat}
And you should see the default FRESHNESS value (3 MINUTE)
{code:java}
CREATE MATERIALIZED TABLE `default_catalog`.`default_database`.`users_shops`
PARTITIONED BY (`ds`)
WITH (
'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '3' MINUTE
REFRESH_MODE = CONTINUOUS
AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`,
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
`datagenSource`.`payment_amount_cents`
FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`)
AS `tmp`
GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds` {code}
4. create materialized table with freshness clause
{code:java}
CREATE MATERIALIZED TABLE users_shops_explicit
PARTITIONED BY (ds)
WITH(
'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '10' SECOND
REFRESH_MODE = CONTINUOUS
AS SELECT
user_id,
shop_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS pv
FROM (
SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
payment_amount_cents FROM datagenSource
) AS tmp
GROUP BY user_id, shop_id, ds; {code}
5. run
{noformat}
SHOW CREATE MATERIALIZED TABLE users_shops_explicit;{noformat}
should yield
{code:java}
CREATE MATERIALIZED TABLE
`default_catalog`.`default_database`.`users_shops_explicit`
PARTITIONED BY (`ds`)
WITH (
'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '10' SECOND
REFRESH_MODE = CONTINUOUS
AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`,
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
`datagenSource`.`payment_amount_cents`
FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`)
AS `tmp`
GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`{code}
6. create materialized table without FRESHNESS and REFRESH_MODE
{code:java}
CREATE MATERIALIZED TABLE users_shops_continuous
WITH(
'connector' = 'blackhole'
)
AS SELECT
user_id,
shop_id,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS pv
FROM datagenSource
GROUP BY user_id, shop_id; {code}
7. run
{code:java}
SHOW CREATE MATERIALIZED TABLE users_shops_continuous{code}
should include the FRESHNESS and REFRESH_MODE
{code:java}
CREATE MATERIALIZED TABLE
`default_catalog`.`default_database`.`users_shops_continuous`
WITH (
'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '3' MINUTE
REFRESH_MODE = CONTINUOUS
AS SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
SUM(`datagenSource`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS
`pv`
FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`
GROUP BY `datagenSource`.`user_id`, `datagenSource`.`shop_id`{code}
> Release Testing Instructions: Validate FLIP-551: Make FRESHNESS Optional for
> Materialized Tables
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-38678
> URL: https://issues.apache.org/jira/browse/FLINK-38678
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Reporter: Ramin Gharib
> Priority: Blocker
> Labels: release-testing
> Fix For: 2.2.0
>
>
> You can use the Flink SQL CLI to test the queries:
> 1. Create Source table
> {code:java}
> CREATE TABLE datagenSource (
> order_id BIGINT,
> order_number VARCHAR(20),
> user_id BIGINT,
> shop_id BIGINT,
> product_id BIGINT,
> status BIGINT,
> order_type BIGINT,
> order_created_at TIMESTAMP(3),
> payment_amount_cents BIGINT
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10'
> ); {code}
> 2. Create the materialized table (without freshness)
> {code:java}
> CREATE MATERIALIZED TABLE users_shops
> PARTITIONED BY (ds)
> WITH(
> 'connector' = 'blackhole'
> )
> REFRESH_MODE = CONTINUOUS
> AS SELECT
> user_id,
> shop_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS
> ds, payment_amount_cents FROM datagenSource
> ) AS tmp
> GROUP BY user_id, shop_id, ds; {code}
> 3. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops;{noformat}
> And you should see the default FRESHNESS value (3 MINUTE)
> {code:java}
> CREATE MATERIALIZED TABLE `default_catalog`.`default_database`.`users_shops`
> PARTITIONED BY (`ds`)
> WITH (
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`,
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`)
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds` {code}
> 4. create materialized table with freshness clause
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_explicit
> PARTITIONED BY (ds)
> WITH(
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT
> user_id,
> shop_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS
> ds, payment_amount_cents FROM datagenSource
> ) AS tmp
> GROUP BY user_id, shop_id, ds; {code}
>
> 5. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops_explicit;{noformat}
> should yield
> {code:java}
> CREATE MATERIALIZED TABLE
> `default_catalog`.`default_database`.`users_shops_explicit`
> PARTITIONED BY (`ds`)
> WITH (
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`,
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`)
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`{code}
> 6. create materialized table without FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_continuous
> WITH(
> 'connector' = 'blackhole'
> )
> AS SELECT
> user_id,
> shop_id,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM datagenSource
> GROUP BY user_id, shop_id; {code}
> 7. run
> {code:java}
> SHOW CREATE MATERIALIZED TABLE users_shops_continuous{code}
> should include the FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE
> `default_catalog`.`default_database`.`users_shops_continuous`
> WITH (
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
> SUM(`datagenSource`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS
> `pv`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`
> GROUP BY `datagenSource`.`user_id`, `datagenSource`.`shop_id`{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)