· 8 years ago · Nov 18, 2017, 05:30 PM
1package com.testing;
2
3import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
4import org.apache.beam.runners.direct.DirectRunner;
5import org.apache.beam.sdk.Pipeline;
6import org.apache.beam.sdk.coders.VoidCoder;
7import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
8import org.apache.beam.sdk.io.kinesis.KinesisIO;
9import org.apache.beam.sdk.io.kinesis.KinesisRecord;
10import org.apache.beam.sdk.options.PipelineOptions;
11import org.apache.beam.sdk.options.PipelineOptionsFactory;
12import org.apache.beam.sdk.transforms.*;
13import org.apache.beam.sdk.transforms.windowing.*;
14import org.apache.beam.sdk.values.KV;
15import org.apache.beam.sdk.values.PCollection;
16import org.apache.beam.sdk.values.PDone;
17import org.joda.time.Instant;
18import org.junit.Test;
19
20import java.io.Serializable;
21import java.nio.charset.CharacterCodingException;
22import java.nio.charset.StandardCharsets;
23
24import static com.amazonaws.regions.Regions.EU_WEST_1;
25import static org.joda.time.Duration.standardSeconds;
26
27public class TestMultipleGroupByTriggerFiring implements Serializable {
28
29 /**
30 * To insert data into pubsub
31 * <p>
32 * gcloud beta pubsub topics publish <TOPIC> WATERMARK
33 * gcloud beta pubsub topics publish <TOPIC> A
34 */
35
36 @Test
37 public void testMultipleGroupByTriggerFiringUsingPubSub() throws Exception {
38 PipelineOptions pipelineOptions = PipelineOptionsFactory.create().as(PipelineOptions.class);
39 pipelineOptions.setRunner(DirectRunner.class);
40
41 Pipeline pipeline = Pipeline.create(pipelineOptions);
42 PCollection<String> dataCollection = pipeline
43 .apply(PubsubIO.readStrings().fromTopic("<TOPIC>"))
44 .apply(Window.<String>into(FixedWindows.of(standardSeconds(30)))
45// I was testing using either triggers
46 .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
47// .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
48 .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
49 .accumulatingFiredPanes())
50 .apply(ParDo.of(new DoFn<String, String>() {
51
52 @ProcessElement
53 public void process(ProcessContext context, IntervalWindow window) {
54 //a check that ignores "WATERMARK" messages which the only purpose is to advance watermark
55 if (!"WATERMARK".equals(context.element())) {
56 System.out.println(Instant.now() + " Received Element " + context.element() + " " + window);
57 context.output(context.element());
58 }
59 }
60 }));
61
62 dataCollection
63 .apply(Count.perElement())
64 .apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
65 @ProcessElement
66 public void process(ProcessContext context, IntervalWindow window) {
67 System.out.println(Instant.now() + " " + "After count " + context.element() + " " + window);
68 context.output(context.element());
69 }
70 }))
71// if I apply any of these trigger I will get expected firings
72// .apply(Window.<KV<String, Long>>configure().triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
73// .apply(Window.<KV<String, Long>>configure().triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())))
74 .apply(GroupByKey.create())
75 .apply(ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Iterable<Long>>>() {
76 @ProcessElement
77 public void process(ProcessContext context, IntervalWindow window) {
78 System.out.println(Instant.now() + " " + "Final group by " + context.element() + " " + window);
79 context.output(context.element());
80 }
81 })).apply(new DummySink<>());
82
83
84 pipeline.run().waitUntilFinish();
85
86 }
87
88 @Test
89 public void testMultipleGroupByTriggerFiringUsingKinesis() throws Exception {
90 PipelineOptions pipelineOptions = PipelineOptionsFactory.create().as(PipelineOptions.class);
91 pipelineOptions.setRunner(DirectRunner.class);
92
93 Pipeline pipeline = Pipeline.create(pipelineOptions);
94 PCollection<String> dataCollection = pipeline.apply(KinesisIO.read()
95 .from("<STREAM>", InitialPositionInStream.LATEST)
96 .withClientProvider("<ACCESS_KEY>", "<SECRET_KEY>", EU_WEST_1))
97 .apply(ParDo.of(new DoFn<KinesisRecord, String>() {
98 @ProcessElement
99 public void process(DoFn<KinesisRecord, String>.ProcessContext context, BoundedWindow window) throws CharacterCodingException {
100 KinesisRecord element = context.element();
101 String tdAgentRecord = String.valueOf(StandardCharsets.UTF_8.newDecoder().decode(element.getData()));
102 context.output(tdAgentRecord);
103 }
104 }))
105 .apply(Window.<String>into(FixedWindows.of(standardSeconds(30)))
106// I was testing using either triggers
107 .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
108// .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
109 .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
110 .accumulatingFiredPanes())
111 .apply(ParDo.of(new DoFn<String, String>() {
112
113 @ProcessElement
114 public void process(ProcessContext context, IntervalWindow window) {
115 //a check that ignores "WATERMARK" messages which the only purpose is to advance watermark
116 if (!"WATERMARK".equals(context.element())) {
117 System.out.println(Instant.now() + " Received Element " + context.element() + " " + window);
118 context.output(context.element());
119 }
120 }
121 }));
122
123 dataCollection
124 .apply(Count.perElement())
125 .apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
126 @ProcessElement
127 public void process(ProcessContext context, IntervalWindow window) {
128 System.out.println(Instant.now() + " " + "After count " + context.element() + " " + window);
129 context.output(context.element());
130 }
131 }))
132// if I apply any of these trigger I will get expected firings
133// .apply(Window.<KV<String, Long>>configure().triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
134// .apply(Window.<KV<String, Long>>configure().triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())))
135 .apply(GroupByKey.create())
136 .apply(ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Iterable<Long>>>() {
137 @ProcessElement
138 public void process(ProcessContext context, IntervalWindow window) {
139 System.out.println(Instant.now() + " " + "Final group by " + context.element() + " " + window);
140 context.output(context.element());
141 }
142 })).apply(new DummySink<>());
143
144
145 pipeline.run().waitUntilFinish();
146
147 }
148
149
150 class DummySink<T> extends PTransform<PCollection<T>, PDone> {
151
152 @Override
153 public PDone expand(PCollection<T> input) {
154 input.apply(ParDo.of(new DoFn<T, Void>() {
155 @ProcessElement
156 public void process(ProcessContext context) {
157 }
158 })).setCoder(VoidCoder.of());
159 return PDone.in(input.getPipeline());
160 }
161 }
162}