· 5 years ago · Jun 18, 2020, 12:04 PM
1CREATE DATABASE logistic;
2
3CREATE TABLE IF NOT EXISTS logistic.filter_scoring_metrics
4(
5 created_at DateTime('Europe/Moscow'),
6 hash String,
7 order_id String,
8 score UInt64
9) ENGINE = MergeTree()
10 PRIMARY KEY created_at
11 PARTITION BY toYYYYMMDD(created_at)
12 ORDER BY created_at
13;
14
15CREATE TABLE IF NOT EXISTS logistic.filter_scoring_queue
16(
17 message String
18)
19 ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:9092',
20 kafka_topic_list = 'logistic.orders',
21 kafka_group_name = 'default',
22 kafka_format = 'JSONEachRow',
23 kafka_row_delimiter = '\n',
24 kafka_num_consumers = 30
25;
26
27CREATE MATERIALIZED VIEW logistic.filter_scoring_consumer TO logistic.filter_scoring_metrics
28AS
29SELECT visitParamExtractString(visitParamExtractRaw(message, 'payload'), 'orderCode') AS order_id,
30 visitParamExtractString(visitParamExtractRaw(message, 'payload'), 'hash') AS hash,
31 parseDateTimeBestEffort(visitParamExtractString(visitParamExtractRaw(message, 'payload'),
32 'createdAt')) AS created_at,
33 visitParamExtractString(visitParamExtractRaw(visitParamExtractRaw(message, 'payload'), 'name'),
34 'name'),
35 visitParamExtractUInt(visitParamExtractRaw(visitParamExtractRaw(message, 'payload'), 'value'), 'value') AS value
36FROM logistic.filter_scoring_queue;