[ 
https://issues.apache.org/jira/browse/FLINK-38678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramin Gharib updated FLINK-38678:
---------------------------------
    Description: 
You can use the Flink SQL CLI to test the queries:

1. Create Source table
{code:java}
CREATE TABLE datagenSource (
  order_id BIGINT,
  order_number VARCHAR(20),
  user_id BIGINT,
  shop_id BIGINT,
  product_id BIGINT,
  status BIGINT,
  order_type BIGINT,
  order_created_at TIMESTAMP(3),
  payment_amount_cents BIGINT
)
WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
); {code}

2. Create the materialized table (without freshness)
{code:java}
CREATE MATERIALIZED TABLE users_shops
PARTITIONED BY (ds)
WITH(
   'connector' = 'blackhole'
)
REFRESH_MODE = CONTINUOUS
AS SELECT
  user_id,
  shop_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
 FROM (
    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
payment_amount_cents FROM datagenSource
 ) AS tmp
 GROUP BY user_id, shop_id, ds; {code}

3. run
{noformat}
SHOW CREATE MATERIALIZED TABLE users_shops;{noformat}
And you should see the default FRESHNESS value (3 MINUTE)
{code:java}
CREATE MATERIALIZED TABLE `default_catalog`.`default_database`.`users_shops`
PARTITIONED BY (`ds`)
WITH (
  'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '3' MINUTE
REFRESH_MODE = CONTINUOUS
AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`
FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`) 
AS `tmp`
GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds` {code}

4. create materialized table with freshness clause
{code:java}
CREATE MATERIALIZED TABLE users_shops_explicit
PARTITIONED BY (ds)
WITH(
   'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '10' SECOND
REFRESH_MODE = CONTINUOUS
AS SELECT
  user_id,
  shop_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
 FROM (
    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
payment_amount_cents FROM datagenSource
 ) AS tmp
 GROUP BY user_id, shop_id, ds; {code}
 
5. run
{noformat}
SHOW CREATE MATERIALIZED TABLE users_shops_explicit;{noformat}
should yield
{code:java}
CREATE MATERIALIZED TABLE 
`default_catalog`.`default_database`.`users_shops_explicit`
PARTITIONED BY (`ds`)
WITH (
  'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '10' SECOND
REFRESH_MODE = CONTINUOUS
AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`
FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`) 
AS `tmp`
GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`{code}

6. create materialized table without FRESHNESS and REFRESH_MODE
{code:java}
CREATE MATERIALIZED TABLE users_shops_continuous
WITH(
   'connector' = 'blackhole'
)
AS SELECT
  user_id,
  shop_id,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
 FROM datagenSource
 GROUP BY user_id, shop_id; {code}

7. run
{code:java}
SHOW CREATE MATERIALIZED TABLE users_shops_continuous{code}
should include the FRESHNESS and REFRESH_MODE
{code:java}
CREATE MATERIALIZED TABLE 
`default_catalog`.`default_database`.`users_shops_continuous`
WITH (
  'connector' = 'blackhole'
)
FRESHNESS = INTERVAL '3' MINUTE
REFRESH_MODE = CONTINUOUS
AS SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
SUM(`datagenSource`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS 
`pv`
FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`
GROUP BY `datagenSource`.`user_id`, `datagenSource`.`shop_id`{code}

> Release Testing Instructions: Validate FLIP-551: Make FRESHNESS Optional for 
> Materialized Tables
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38678
>                 URL: https://issues.apache.org/jira/browse/FLINK-38678
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>            Reporter: Ramin Gharib
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 2.2.0
>
>
> You can use the Flink SQL CLI to test the queries:
> 1. Create Source table
> {code:java}
> CREATE TABLE datagenSource (
>   order_id BIGINT,
>   order_number VARCHAR(20),
>   user_id BIGINT,
>   shop_id BIGINT,
>   product_id BIGINT,
>   status BIGINT,
>   order_type BIGINT,
>   order_created_at TIMESTAMP(3),
>   payment_amount_cents BIGINT
> )
> WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '10'
> ); {code}
> 2. Create the materialized table (without freshness)
> {code:java}
> CREATE MATERIALIZED TABLE users_shops
> PARTITIONED BY (ds)
> WITH(
>    'connector' = 'blackhole'
> )
> REFRESH_MODE = CONTINUOUS
> AS SELECT
>   user_id,
>   shop_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
>  FROM (
>     SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS 
> ds, payment_amount_cents FROM datagenSource
>  ) AS tmp
>  GROUP BY user_id, shop_id, ds; {code}
> 3. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops;{noformat}
> And you should see the default FRESHNESS value (3 MINUTE)
> {code:java}
> CREATE MATERIALIZED TABLE `default_catalog`.`default_database`.`users_shops`
> PARTITIONED BY (`ds`)
> WITH (
>   'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, 
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`) 
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds` {code}
> 4. create materialized table with freshness clause
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_explicit
> PARTITIONED BY (ds)
> WITH(
>    'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT
>   user_id,
>   shop_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
>  FROM (
>     SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS 
> ds, payment_amount_cents FROM datagenSource
>  ) AS tmp
>  GROUP BY user_id, shop_id, ds; {code}
>  
> 5. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops_explicit;{noformat}
> should yield
> {code:java}
> CREATE MATERIALIZED TABLE 
> `default_catalog`.`default_database`.`users_shops_explicit`
> PARTITIONED BY (`ds`)
> WITH (
>   'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, 
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`) 
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`{code}
> 6. create materialized table without FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_continuous
> WITH(
>    'connector' = 'blackhole'
> )
> AS SELECT
>   user_id,
>   shop_id,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
>  FROM datagenSource
>  GROUP BY user_id, shop_id; {code}
> 7. run
> {code:java}
> SHOW CREATE MATERIALIZED TABLE users_shops_continuous{code}
> should include the FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE 
> `default_catalog`.`default_database`.`users_shops_continuous`
> WITH (
>   'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
> SUM(`datagenSource`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS 
> `pv`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`
> GROUP BY `datagenSource`.`user_id`, `datagenSource`.`shop_id`{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to