· 7 years ago · Mar 17, 2019, 11:14 PM
1import io.reactivex.Flowable;
2import io.reactivex.FlowableTransformer;
3import io.reactivex.disposables.Disposable;
4import io.reactivex.schedulers.Schedulers;
5import org.reactivestreams.Publisher;
6import org.reactivestreams.Subscriber;
7import org.reactivestreams.Subscription;
8
9import java.io.IOException;
10import java.util.ArrayList;
11import java.util.List;
12import java.util.concurrent.TimeUnit;
13import java.util.concurrent.atomic.AtomicInteger;
14import java.util.concurrent.atomic.AtomicLong;
15import java.util.concurrent.atomic.AtomicReference;
16
17/**
18 * Buffer elements by time period or count with backpressure supported
19 *
20 * @author snoop.fy at gmail.com
21 */
22public class BufferTransformer<T> implements FlowableTransformer<T, List<T>> {
23
24 private final Integer timespan;
25
26 private final Integer count;
27
28 private final TimeUnit timeUnit;
29
30 public BufferTransformer(int timespan, TimeUnit timeUnit, int count) {
31 this.timespan = timespan;
32 this.timeUnit = timeUnit;
33 this.count = count;
34 }
35
36 @Override
37 public Publisher<List<T>> apply(Flowable<T> upstream) {
38 return s -> upstream.subscribe(new BufferSubscriber<>(s));
39 }
40
41 class BufferSubscriber<T> implements Subscriber<T> {
42
43 private final AtomicInteger wip = new AtomicInteger(0);
44
45 private final Subscriber<? super List<T>> actual;
46
47 private volatile Subscription subscription;
48
49 private final AtomicLong timeNanos = new AtomicLong();
50
51 private final AtomicReference<List<T>> buffer = new AtomicReference<>();
52
53 private final AtomicReference<Disposable> timeoutWork = new AtomicReference<>();
54
55
56 BufferSubscriber(Subscriber<? super List<T>> actual) {
57 this.actual = actual;
58 }
59
60 @Override
61 public void onSubscribe(Subscription s) {
62 this.subscription = s;
63 resetWindow();
64 s.request(count);
65 wip.addAndGet(count);
66 }
67
68 @Override
69 public void onNext(T t) {
70 if (wip.decrementAndGet() == 0) {
71 this.subscription.request(count);
72 wip.addAndGet(count);
73 }
74 buffer.get().add(t);
75 synchronized (actual) {
76 if (buffer.get().size() == count) {
77 actual.onNext(buffer.get());
78 resetWindow();
79 } else if (System.nanoTime() - timeNanos.get() >= timeUnit.toNanos(timespan)) {
80 actual.onNext(buffer.get());
81 resetWindow();
82 }
83 }
84 }
85
86 @Override
87 public void onError(Throwable t) {
88 synchronized (this.actual) {
89 this.actual.onError(t);
90 }
91 }
92
93 @Override
94 public void onComplete() {
95 synchronized (this.actual) {
96 this.actual.onComplete();
97 }
98 }
99
100 private void resetWindow() {
101 buffer.set(new ArrayList<>());
102 timeNanos.set(System.nanoTime());
103
104 timeoutWork.getAndUpdate(s -> {
105 if (s != null && !s.isDisposed()) {
106 s.dispose();
107 }
108 return Schedulers.computation().scheduleDirect(() -> {
109 synchronized (actual) {
110 actual.onNext(buffer.get());
111 resetWindow();
112 }
113 }, timespan, timeUnit);
114 });
115 }
116 }
117
118 public static void main(String[] args) throws IOException {
119 Flowable.interval(1, TimeUnit.MILLISECONDS)
120 .compose(new BufferTransformer<Long>(1, TimeUnit.MILLISECONDS, 8))
121 .subscribe(System.out::println);
122 System.in.read();
123 }
124}