· 5 years ago · Dec 01, 2020, 03:32 PM
1package com.amazonaws.services.kinesisanalytics;
2
3import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
4
5import org.apache.flink.api.common.functions.FlatMapFunction;
6import org.apache.flink.api.common.functions.JoinFunction;
7import org.apache.flink.api.common.serialization.SimpleStringSchema;
8import org.apache.flink.api.java.functions.KeySelector;
9import org.apache.flink.api.java.tuple.Tuple;
10import org.apache.flink.api.java.tuple.Tuple2;
11import org.apache.flink.api.java.tuple.Tuple8;
12import org.apache.flink.streaming.api.TimeCharacteristic;
13import org.apache.flink.streaming.api.datastream.DataStream;
14import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
15import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
16import org.apache.flink.streaming.api.windowing.time.Time;
17import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
18import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
19import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
20import org.apache.flink.util.Collector;
21import org.slf4j.Logger;
22import org.slf4j.LoggerFactory;
23
24import java.io.IOException;
25import java.sql.Types;
26import java.util.HashMap;
27import java.util.Map;
28import java.util.Properties;
29
30/**
31 * A basic Kinesis Data Analytics for Java application with Kinesis data
32 * streams as source and sink.
33 */
34public class BasicStreamingJob {
35
36 private static final Logger log = LoggerFactory.getLogger(BasicStreamingJob.class);
37
38 private static final String region = "us-east-1";
39 private static final String inputStreamName1 = "AUTH_X_TYPE";
40 private static final String inputStreamName2 = "CLR_X_TYPE";
41 private static final String outputStreamName = "TRANSACTION";
42
43 private static DataStream<String> createSource1FromStaticConfig(StreamExecutionEnvironment env) {
44 Properties inputProperties = new Properties();
45 inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
46 inputProperties.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, aws_access_key_id);
47 inputProperties.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, aws_secret_access_key);
48 inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
49
50 return env.addSource(new FlinkKinesisConsumer<>(inputStreamName1, new SimpleStringSchema(), inputProperties));
51 }
52
53 private static DataStream<String> createSource2FromStaticConfig(StreamExecutionEnvironment env) {
54 Properties inputProperties = new Properties();
55 inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
56 inputProperties.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, aws_access_key_id);
57 inputProperties.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, aws_secret_access_key);
58 inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
59
60 return env.addSource(new FlinkKinesisConsumer<>(inputStreamName2, new SimpleStringSchema(), inputProperties));
61 }
62
63 private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException {
64 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
65 return env.addSource(new FlinkKinesisConsumer<>(inputStreamName1, new SimpleStringSchema(),
66 applicationProperties.get("ConsumerConfigProperties")));
67 }
68
69 private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
70 Properties outputProperties = new Properties();
71 outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
72 outputProperties.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, aws_access_key_id);
73 outputProperties.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, aws_secret_access_key);
74 outputProperties.setProperty("AggregationEnabled", "false");
75
76 FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
77 sink.setDefaultStream(outputStreamName);
78 sink.setDefaultPartition("0");
79 return sink;
80 }
81
82 private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
83 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
84 FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
85 applicationProperties.get("ProducerConfigProperties"));
86
87 sink.setDefaultStream(outputStreamName);
88 sink.setDefaultPartition("0");
89 return sink;
90 }
91
92 public static void main(String[] args) throws Exception {
93 // set up the streaming execution environment
94 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
95 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
96
97 /* if you would like to use runtime configuration properties, uncomment the lines below
98 * DataStream<String> input = createSourceFromApplicationProperties(env);
99 */
100 DataStream<String> input1 = createSource1FromStaticConfig(env);
101 DataStream<String> input2 = createSource2FromStaticConfig(env);
102
103// DataStream<String> auth = input.map((value) -> {
104// String str[] = value.split(";");
105// HashMap<String,String> map = new HashMap<String, String>();
106// log.info("Got records: " + str.length);
107// for(int i=1;i<str.length;i++){
108// map.put(String.valueOf(i), str[i]);
109// log.info("Got subvalue: " + str[i]);
110// }
111// log.info("Got value: " + value + ", transformed to: " + map.toString());
112// return map.toString();
113// });
114
115// input.flatMap(new Tokenizer())
116// .keyBy(0)
117// .timeWindow(Time.days(30))
118// .sum(2)
119// .map((value) -> {
120// String result = value.f0.toString() + "," + value.f2.toString() + "\n";
121// log.info("Got result: " + result);
122// return result;
123// })
124// .addSink(createSinkFromStaticConfig());
125
126// DataStream<Tuple8<Integer, Integer, Double, Integer, String, String, String, String>> auth = input1.flatMap(new Tokenizer());
127// DataStream<Tuple8<Integer, Integer, Double, Integer, String, String, String, String>> post = input2.flatMap(new Tokenizer());
128
129 input1.join(input2)
130 .where(new KeySelector<String, Integer>() {
131
132 private static final long serialVersionUID = 1L;
133
134 @Override
135 public Integer getKey(String value) {
136// Tuple8<Integer, Integer, Double, Integer, String, String, String, String> out;
137 String str[] = value.split(";");
138// out.setFields(Integer.parseInt(str[0]), Integer.parseInt(str[1]), Double.parseDouble(str[2]), Integer.parseInt(str[3]), str[4], str[5], str[6], str[7]);
139 log.info("Where key: " + str[0]);
140 return Integer.parseInt(str[0]);
141 }
142 })
143 .equalTo(new KeySelector<String, Integer>() {
144
145 private static final long serialVersionUID = 1L;
146
147 @Override
148 public Integer getKey(String value) {
149// Tuple8<Integer, Integer, Double, Integer, String, String, String, String> out;
150 String str[] = value.split(";");
151// out.setFields(Integer.parseInt(str[0]), Integer.parseInt(str[1]), Double.parseDouble(str[2]), Integer.parseInt(str[3]), str[4], str[5], str[6], str[7]);
152 log.info("Equal key: " + str[0]);
153 return Integer.parseInt(str[0]);
154 }
155 })
156 .window(TumblingEventTimeWindows.of(Time.minutes(5)))
157 .apply(new JoinFunction<String, String, String>(){
158
159 private static final long serialVersionUID = 1L;
160
161 @Override
162 public String join(String first, String second) {
163 String result = first + "," + second;
164 log.info("Got result: " + result);
165 return result;
166 }})
167 .addSink(createSinkFromStaticConfig());
168
169
170
171// auth.keyBy(0)
172// .timeWindow(Time.seconds(60))
173// .sum(2)
174// .map((value) -> {
175// return value.toString();
176// })
177// ;
178
179 /* if you would like to use runtime configuration properties, uncomment the lines below
180 * input.addSink(createSinkFromApplicationProperties())
181 */
182// auth.addSink(createSinkFromStaticConfig());
183
184 env.execute("Flink Streaming Java API Skeleton");
185 }
186
187 public static final class Tokenizer
188 implements FlatMapFunction<String, Tuple8<Integer, Integer, Double, Integer, String, String, String, String>> {
189
190 @Override
191 public void flatMap(String value, Collector<Tuple8<Integer, Integer, Double, Integer, String, String, String, String>> out) {
192 String str[] = value.split(";");
193 out.collect(new Tuple8<>(Integer.parseInt(str[0]), Integer.parseInt(str[1]), Double.parseDouble(str[2]), Integer.parseInt(str[3]), str[4], str[5], str[6], str[7]));
194 }
195 }
196
197 public static class TupleKeySelector implements KeySelector<String, Integer> {
198 @Override
199 public Integer getKey(String value) {
200// Tuple8<Integer, Integer, Double, Integer, String, String, String, String> out;
201 String str[] = value.split(";");
202// out.setFields(Integer.parseInt(str[0]), Integer.parseInt(str[1]), Double.parseDouble(str[2]), Integer.parseInt(str[3]), str[4], str[5], str[6], str[7]);
203
204 return Integer.parseInt(str[0]);
205 }
206 }
207}
208