CREATE TEMPORARY TABLE user_action_source ( `timestamp` BIGINT, `user_id` BIGINT, `item_id` BIGINT, `price` DOUBLE,SQs) WITH ( 'connector' = 'kafka', 'topic' = '<your_topic>', 'properties.bootstrap.servers' = 'your_kafka_server:9092', 'properties.group.id' = '<your_consumer_group>' 'format' = 'json', 'scan.startup.mode' = 'latest-offset');
CREATE TEMPORARY TABLE item_detail_dim ( id STRING, catagory STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'redis', 'host' = '<your_redis_host>', 'port' = '<your_redis_port>', 'password' = '<your_redis_password>', 'dbNum' = '<your_db_num>');
CREATE TEMPORARY TABLE gmv_output ( time_minute STRING, catagory STRING, gmv DOUBLE, PRIMARY KEY (time_minute, catagory)) WITH ( type='rds', url='<your_jdbc_mysql_url_with_database>', tableName='<your_table>', userName='<your_mysql_database_username>', password='<your_mysql_database_password>');
INSERT INTO gmv_output SELECT TUMBLE_START(s.timestamp, INTERVAL '1' MINUTES) as time_minute, d.catagory, SUM(d.price) as gmvFROM user_action_source s JOIN item_detail_dim FOR SYSTEM_TIME AS OF PROCTIME() as d ON s.item_id = d.idGROUP BY TUMBLE(s.timestamp, INTERVAL '1' MINUTES), d.catagory;