· 5 years ago · Jul 20, 2020, 04:16 PM
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.beam.sdk.io.kafka;
19
20import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
21import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
22
23import com.google.auto.service.AutoService;
24import com.google.auto.value.AutoValue;
25import io.confluent.kafka.serializers.KafkaAvroDeserializer;
26import java.io.InputStream;
27import java.io.OutputStream;
28import java.lang.reflect.Method;
29import java.util.ArrayList;
30import java.util.Collections;
31import java.util.HashMap;
32import java.util.List;
33import java.util.Map;
34import java.util.Optional;
35import java.util.Set;
36import javax.annotation.Nullable;
37import org.apache.beam.sdk.annotations.Experimental;
38import org.apache.beam.sdk.annotations.Experimental.Kind;
39import org.apache.beam.sdk.coders.AtomicCoder;
40import org.apache.beam.sdk.coders.AvroCoder;
41import org.apache.beam.sdk.coders.ByteArrayCoder;
42import org.apache.beam.sdk.coders.Coder;
43import org.apache.beam.sdk.coders.CoderRegistry;
44import org.apache.beam.sdk.coders.KvCoder;
45import org.apache.beam.sdk.coders.VarIntCoder;
46import org.apache.beam.sdk.coders.VarLongCoder;
47import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
48import org.apache.beam.sdk.io.Read.Unbounded;
49import org.apache.beam.sdk.io.UnboundedSource;
50import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
51import org.apache.beam.sdk.options.PipelineOptions;
52import org.apache.beam.sdk.options.ValueProvider;
53import org.apache.beam.sdk.transforms.DoFn;
54import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
55import org.apache.beam.sdk.transforms.MapElements;
56import org.apache.beam.sdk.transforms.PTransform;
57import org.apache.beam.sdk.transforms.ParDo;
58import org.apache.beam.sdk.transforms.SerializableFunction;
59import org.apache.beam.sdk.transforms.SimpleFunction;
60import org.apache.beam.sdk.transforms.display.DisplayData;
61import org.apache.beam.sdk.values.KV;
62import org.apache.beam.sdk.values.PBegin;
63import org.apache.beam.sdk.values.PCollection;
64import org.apache.beam.sdk.values.PDone;
65import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
66import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
67import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
68import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
69import org.apache.kafka.clients.consumer.Consumer;
70import org.apache.kafka.clients.consumer.ConsumerConfig;
71import org.apache.kafka.clients.consumer.KafkaConsumer;
72import org.apache.kafka.clients.producer.KafkaProducer;
73import org.apache.kafka.clients.producer.Producer;
74import org.apache.kafka.clients.producer.ProducerConfig;
75import org.apache.kafka.clients.producer.ProducerRecord;
76import org.apache.kafka.common.TopicPartition;
77import org.apache.kafka.common.serialization.ByteArrayDeserializer;
78import org.apache.kafka.common.serialization.Deserializer;
79import org.apache.kafka.common.serialization.Serializer;
80import org.apache.kafka.common.serialization.StringSerializer;
81import org.apache.kafka.common.utils.AppInfoParser;
82import org.checkerframework.checker.units.qual.K;
83import org.joda.time.Duration;
84import org.joda.time.Instant;
85import org.slf4j.Logger;
86import org.slf4j.LoggerFactory;
87
88/**
89 * An unbounded source and a sink for <a href="http://kafka.apache.org/">Kafka</a> topics.
90 *
91 * <h3>Reading from Kafka topics</h3>
92 *
93 * <p>KafkaIO source returns unbounded collection of Kafka records as {@code
94 * PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic metadata like
95 * topic-partition and offset, along with key and value associated with a Kafka record.
96 *
97 * <p>Although most applications consume a single topic, the source can be configured to consume
98 * multiple topics or even a specific set of {@link TopicPartition}s.
99 *
100 * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>,
101 * one or more topics to consume, and key and value deserializers. For example:
102 *
103 * <pre>{@code
104 * pipeline
105 * .apply(KafkaIO.<Long, String>read()
106 * .withBootstrapServers("broker_1:9092,broker_2:9092")
107 * .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
108 * .withKeyDeserializer(LongDeserializer.class)
109 * .withValueDeserializer(StringDeserializer.class)
110 *
111 * // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
112 *
113 * // Rest of the settings are optional :
114 *
115 * // you can further customize KafkaConsumer used to read the records by adding more
116 * // settings for ConsumerConfig. e.g :
117 * .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
118 *
119 * // set event times and watermark based on 'LogAppendTime'. To provide a custom
120 * // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
121 * // Use withCreateTime() with topics that have 'CreateTime' timestamps.
122 * .withLogAppendTime()
123 *
124 * // restrict reader to committed messages on Kafka (see method documentation).
125 * .withReadCommitted()
126 *
127 * // offset consumed by the pipeline can be committed back.
128 * .commitOffsetsInFinalize()
129 *
130 * // finally, if you don't need Kafka metadata, you can drop it.g
131 * .withoutMetadata() // PCollection<KV<Long, String>>
132 * )
133 * .apply(Values.<String>create()) // PCollection<String>
134 * ...
135 * }</pre>
136 *
137 * <p>Kafka provides deserializers for common types in {@link
138 * org.apache.kafka.common.serialization}. In addition to deserializers, Beam runners need {@link
139 * Coder} to materialize key and value objects if necessary. In most cases, you don't need to
140 * specify {@link Coder} for key and value in the resulting collection because the coders are
141 * inferred from deserializer types. However, in cases when coder inference fails, they can be
142 * specified explicitly along with deserializers using {@link
143 * Read#withKeyDeserializerAndCoder(Class, Coder)} and {@link
144 * Read#withValueDeserializerAndCoder(Class, Coder)}. Note that Kafka messages are interpreted using
145 * key and value <i>deserializers</i>.
146 *
147 * <h3>Partition Assignment and Checkpointing</h3>
148 *
149 * The Kafka partitions are evenly distributed among splits (workers).
150 *
151 * <p>Checkpointing is fully supported and each split can resume from previous checkpoint (to the
152 * extent supported by runner). See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for
153 * more details on splits and checkpoint support.
154 *
155 * <p>When the pipeline starts for the first time, or without any checkpoint, the source starts
156 * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
157 * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through {@link
158 * Read#withConsumerConfigUpdates(Map)}. You can also enable offset auto_commit in Kafka to resume
159 * from last committed.
160 *
161 * <p>In summary, KafkaIO.read follows below sequence to set initial offset:<br>
162 * 1. {@link KafkaCheckpointMark} provided by runner;<br>
163 * 2. Consumer offset stored in Kafka when {@code ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true};
164 * <br>
165 * 3. Start from <em>latest</em> offset by default;
166 *
167 * <p>Seek to initial offset is a blocking operation in Kafka API, which can block forever for
168 * certain versions of Kafka client library. This is resolved by <a
169 * href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior">KIP-266</a>
170 * which provides `default.api.timeout.ms` consumer config setting to control such timeouts.
171 * KafkaIO.read implements timeout itself, to not to block forever in case older Kafka client is
172 * used. It does recognize `default.api.timeout.ms` setting and will honor the timeout value if it
173 * is passes in consumer config.
174 *
175 * <h3>Use Avro schema with Confluent Schema Registry</h3>
176 *
177 * <p>If you want to deserialize the keys and/or values based on a schema available in Confluent
178 * Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
179 * for deserialization. A {@link Coder} will be inferred automatically based on the respective
180 * {@link Deserializer}.
181 *
182 * <p>For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key
183 * and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users
184 * don't need to specify key or/and value deserializers and coders since they will be set to {@link
185 * KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.
186 *
187 * <p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
188 * keys are typed as {@link Long}:
189 *
190 * <pre>{@code
191 * PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
192 * .apply(KafkaIO.<Long, GenericRecord>read()
193 * .withBootstrapServers("broker_1:9092,broker_2:9092")
194 * .withTopic("my_topic")
195 * .withKeyDeserializer(LongDeserializer.class)
196 * // Use Confluent Schema Registry, specify schema registry URL and value subject
197 * .withValueDeserializer(
198 * ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
199 * ...
200 * }</pre>
201 *
202 * <h3>Writing to Kafka</h3>
203 *
204 * <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the
205 * values or native Kafka producer records using {@link
206 * org.apache.kafka.clients.producer.ProducerRecord}. To configure a Kafka sink, you must specify at
207 * the minimum Kafka <tt>bootstrapServers</tt>, the topic to write to, and key and value
208 * serializers. For example:
209 *
210 * <pre>{@code
211 * PCollection<KV<Long, String>> kvColl = ...;
212 * kvColl.apply(KafkaIO.<Long, String>write()
213 * .withBootstrapServers("broker_1:9092,broker_2:9092")
214 * .withTopic("results")
215 *
216 * .withKeySerializer(LongSerializer.class)
217 * .withValueSerializer(StringSerializer.class)
218 *
219 * // You can further customize KafkaProducer used to write the records by adding more
220 * // settings for ProducerConfig. e.g, to enable compression :
221 * .withProducerConfigUpdates(ImmutableMap.of("compression.type", "gzip"))
222 *
223 * // You set publish timestamp for the Kafka records.
224 * .withInputTimestamp() // element timestamp is used while publishing to Kafka
225 * // or you can also set a custom timestamp with a function.
226 * .withPublishTimestampFunction((elem, elemTs) -> ...)
227 *
228 * // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
229 * .withEOS(20, "eos-sink-group-id");
230 * );
231 * }</pre>
232 *
233 * <p>Often you might want to write just values without any keys to Kafka. Use {@code values()} to
234 * write records with default empty(null) key:
235 *
236 * <pre>{@code
237 * PCollection<String> strings = ...;
238 * strings.apply(KafkaIO.<Void, String>write()
239 * .withBootstrapServers("broker_1:9092,broker_2:9092")
240 * .withTopic("results")
241 * .withValueSerializer(StringSerializer.class) // just need serializer for value
242 * .values()
243 * );
244 * }</pre>
245 *
246 * <p>Also, if you want to write Kafka {@link ProducerRecord} then you should use {@link
247 * KafkaIO#writeRecords()}:
248 *
249 * <pre>{@code
250 * PCollection<ProducerRecord<Long, String>> records = ...;
251 * records.apply(KafkaIO.<Long, String>writeRecords()
252 * .withBootstrapServers("broker_1:9092,broker_2:9092")
253 * .withTopic("results")
254 * .withKeySerializer(LongSerializer.class)
255 * .withValueSerializer(StringSerializer.class)
256 * );
257 * }</pre>
258 *
259 * <h3>Advanced Kafka Configuration</h3>
260 *
261 * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in {@link
262 * ProducerConfig} for sink. E.g. if you would like to enable offset <em>auto commit</em> (for
263 * external monitoring or other purposes), you can set <tt>"group.id"</tt>,
264 * <tt>"enable.auto.commit"</tt>, etc.
265 *
266 * <h3>Event Timestamps and Watermark</h3>
267 *
268 * By default, record timestamp (event time) is set to processing time in KafkaIO reader and source
269 * watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled
270 * ('LogAppendTime'), it can enabled with {@link Read#withLogAppendTime()}. A custom timestamp
271 * policy can be provided by implementing {@link TimestampPolicyFactory}. See {@link
272 * Read#withTimestampPolicyFactory(TimestampPolicyFactory)} for more information.
273 *
274 * <h3>Supported Kafka Client Versions</h3>
275 *
276 * KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
277 * <i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions 0.9.x
278 * - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future. Please
279 * ensure that the version included with the application is compatible with the version of your
280 * Kafka cluster. Kafka client usually fails to initialize with a clear error message in case of
281 * incompatibility.
282 */
283@Experimental(Kind.SOURCE_SINK)
284public class KafkaIO {
285
286 /**
287 * A specific instance of uninitialized {@link #read()} where key and values are bytes. See
288 * #read().
289 */
290 public static Read<byte[], byte[]> readBytes() {
291 return KafkaIO.<byte[], byte[]>read()
292 .withKeyDeserializer(ByteArrayDeserializer.class)
293 .withValueDeserializer(ByteArrayDeserializer.class);
294 }
295
296 /**
297 * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka configuration
298 * should set with {@link Read#withBootstrapServers(String)} and {@link Read#withTopics(List)}.
299 * Other optional settings include key and value {@link Deserializer}s, custom timestamp and
300 * watermark functions.
301 */
302 public static <K, V> Read<K, V> read() {
303 return new AutoValue_KafkaIO_Read.Builder<K, V>()
304 .setTopics(new ArrayList<>())
305 .setTopicPartitions(new ArrayList<>())
306 .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
307 .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
308 .setMaxNumRecords(Long.MAX_VALUE)
309 .setCommitOffsetsInFinalizeEnabled(false)
310 .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
311 .build();
312 }
313
314 /**
315 * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration
316 * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} along
317 * with {@link Deserializer}s for (optional) key and values.
318 */
319 public static <K, V> Write<K, V> write() {
320 return new AutoValue_KafkaIO_Write.Builder<K, V>()
321 .setWriteRecordsTransform(
322 new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
323 .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
324 .setEOS(false)
325 .setNumShards(0)
326 .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
327 .build())
328 .build();
329 }
330
331 /**
332 * Creates an uninitialized {@link WriteRecords} {@link PTransform}. Before use, Kafka
333 * configuration should be set with {@link WriteRecords#withBootstrapServers(String)} and {@link
334 * WriteRecords#withTopic} along with {@link Deserializer}s for (optional) key and values.
335 */
336 public static <K, V> WriteRecords<K, V> writeRecords() {
337 return new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
338 .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
339 .setEOS(false)
340 .setNumShards(0)
341 .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
342 .build();
343 }
344
345 ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
346
347 /**
348 * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on
349 * usage and configuration.
350 */
351 @AutoValue
352 public abstract static class Read<K, V>
353 extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
354 abstract Map<String, Object> getConsumerConfig();
355
356 abstract List<String> getTopics();
357
358 abstract List<TopicPartition> getTopicPartitions();
359
360 @Nullable
361 abstract Coder<K> getKeyCoder();
362
363 @Nullable
364 abstract Coder<V> getValueCoder();
365
366 abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
367 getConsumerFactoryFn();
368
369 @Nullable
370 abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();
371
372 abstract long getMaxNumRecords();
373
374 @Nullable
375 abstract Duration getMaxReadTime();
376
377 @Nullable
378 abstract Instant getStartReadTime();
379
380 abstract boolean isCommitOffsetsInFinalizeEnabled();
381
382 abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
383
384 @Nullable
385 abstract Map<String, Object> getOffsetConsumerConfig();
386
387 @Nullable
388 abstract DeserializerProvider getKeyDeserializerProvider();
389
390 @Nullable
391 abstract DeserializerProvider getValueDeserializerProvider();
392
393 abstract Builder<K, V> toBuilder();
394
395 @Experimental(Kind.PORTABILITY)
396 @AutoValue.Builder
397 abstract static class Builder<K, V>
398 implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
399 abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
400
401 abstract Builder<K, V> setTopics(List<String> topics);
402
403 abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);
404
405 abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
406
407 abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
408
409 abstract Builder<K, V> setConsumerFactoryFn(
410 SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
411
412 abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
413
414 abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
415
416 abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
417
418 abstract Builder<K, V> setStartReadTime(Instant startReadTime);
419
420 abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize);
421
422 abstract Builder<K, V> setTimestampPolicyFactory(
423 TimestampPolicyFactory<K, V> timestampPolicyFactory);
424
425 abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);
426
427 abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider deserializerProvider);
428
429 abstract Builder<K, V> setValueDeserializerProvider(
430 DeserializerProvider deserializerProvider);
431
432 abstract Read<K, V> build();
433
434 @Override
435 public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
436 External.Configuration config) {
437 ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
438 for (String topic : config.topics) {
439 listBuilder.add(topic);
440 }
441 setTopics(listBuilder.build());
442
443 Class keyDeserializer = resolveClass(config.keyDeserializer);
444 setKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
445 setKeyCoder(resolveCoder(keyDeserializer));
446
447 Class valueDeserializer = resolveClass(config.valueDeserializer);
448 setValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
449 setValueCoder(resolveCoder(valueDeserializer));
450
451 Map<String, Object> consumerConfig = new HashMap<>();
452 for (KV<String, String> kv : config.consumerConfig) {
453 consumerConfig.put(kv.getKey(), kv.getValue());
454 }
455 // Key and Value Deserializers always have to be in the config.
456 consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
457 consumerConfig.put(
458 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
459 setConsumerConfig(consumerConfig);
460
461 // Set required defaults
462 setTopicPartitions(Collections.emptyList());
463 setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN);
464 if (config.maxReadTime != null) {
465 setMaxReadTime(Duration.millis(config.maxReadTime));
466 }
467 setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
468 setCommitOffsetsInFinalizeEnabled(false);
469 setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
470 // We do not include Metadata until we can encode KafkaRecords cross-language
471 return build().withoutMetadata();
472 }
473
474 private static Coder resolveCoder(Class deserializer) {
475 for (Method method : deserializer.getDeclaredMethods()) {
476 if (method.getName().equals("deserialize")) {
477 Class<?> returnType = method.getReturnType();
478 if (returnType.equals(Object.class)) {
479 continue;
480 }
481 if (returnType.equals(byte[].class)) {
482 return ByteArrayCoder.of();
483 } else if (returnType.equals(Integer.class)) {
484 return VarIntCoder.of();
485 } else if (returnType.equals(Long.class)) {
486 return VarLongCoder.of();
487 } else {
488 throw new RuntimeException("Couldn't infer Coder from " + deserializer);
489 }
490 }
491 }
492 throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer);
493 }
494 }
495
496 /**
497 * Exposes {@link KafkaIO.TypedWithoutMetadata} as an external transform for cross-language
498 * usage.
499 */
500 @Experimental(Kind.PORTABILITY)
501 @AutoService(ExternalTransformRegistrar.class)
502 public static class External implements ExternalTransformRegistrar {
503
504 public static final String URN = "beam:external:java:kafka:read:v1";
505
506 @Override
507 public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
508 return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class);
509 }
510
511 /** Parameters class to expose the Read transform to an external SDK. */
512 public static class Configuration {
513
514 // All byte arrays are UTF-8 encoded strings
515 private Iterable<KV<String, String>> consumerConfig;
516 private Iterable<String> topics;
517 private String keyDeserializer;
518 private String valueDeserializer;
519 private Long maxNumRecords;
520 private Long maxReadTime;
521
522 public void setConsumerConfig(Iterable<KV<String, String>> consumerConfig) {
523 this.consumerConfig = consumerConfig;
524 }
525
526 public void setMaxNumRecords(Long maxNumRecords) {
527 this.maxNumRecords = maxNumRecords;
528 }
529
530 public void setMaxReadTime(Long maxReadTime) {
531 this.maxReadTime = maxReadTime;
532 }
533
534 public void setTopics(Iterable<String> topics) {
535 this.topics = topics;
536 }
537
538 public void setKeyDeserializer(String keyDeserializer) {
539 this.keyDeserializer = keyDeserializer;
540 }
541
542 public void setValueDeserializer(String valueDeserializer) {
543 this.valueDeserializer = valueDeserializer;
544 }
545 }
546 }
547
548 /** Sets the bootstrap servers for the Kafka consumer. */
549 public Read<K, V> withBootstrapServers(String bootstrapServers) {
550 return withConsumerConfigUpdates(
551 ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
552 }
553
554 /**
555 * Sets the topic to read from.
556 *
557 * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
558 * partitions are distributed among the splits.
559 */
560 public Read<K, V> withTopic(String topic) {
561 return withTopics(ImmutableList.of(topic));
562 }
563
564 /**
565 * Sets a list of topics to read from. All the partitions from each of the topics are read.
566 *
567 * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
568 * partitions are distributed among the splits.
569 */
570 public Read<K, V> withTopics(List<String> topics) {
571 checkState(
572 getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
573 return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
574 }
575
576 /**
577 * Sets a list of partitions to read from. This allows reading only a subset of partitions for
578 * one or more topics when (if ever) needed.
579 *
580 * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
581 * partitions are distributed among the splits.
582 */
583 public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
584 checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
585 return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
586 }
587
588 /**
589 * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
590 *
591 * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at
592 * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class,
593 * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to
594 * provide the key coder explicitly.
595 */
596 public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
597 return withKeyDeserializer(LocalDeserializerProvider.of(keyDeserializer));
598 }
599
600 /**
601 * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a
602 * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
603 *
604 * <p>Use this method only if your pipeline doesn't work with plain {@link
605 * #withKeyDeserializer(Class)}.
606 */
607 public Read<K, V> withKeyDeserializerAndCoder(
608 Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
609 return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
610 }
611
612 public Read<K, V> withKeyDeserializer(DeserializerProvider<K> deserializerProvider) {
613 return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
614 }
615
616 /**
617 * Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.
618 *
619 * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize value objects at
620 * runtime. KafkaIO tries to infer a coder for the value based on the {@link Deserializer}
621 * class, however in case that fails, you can use {@link #withValueDeserializerAndCoder(Class,
622 * Coder)} to provide the value coder explicitly.
623 */
624 public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
625 return withValueDeserializer(LocalDeserializerProvider.of(valueDeserializer));
626 }
627
628 /**
629 * Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a
630 * {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary.
631 *
632 * <p>Use this method only if your pipeline doesn't work with plain {@link
633 * #withValueDeserializer(Class)}.
634 */
635 public Read<K, V> withValueDeserializerAndCoder(
636 Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
637 return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
638 }
639
640 public Read<K, V> withValueDeserializer(DeserializerProvider<V> deserializerProvider) {
641 return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
642 }
643
644 /**
645 * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
646 * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.
647 */
648 public Read<K, V> withConsumerFactoryFn(
649 SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
650 return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
651 }
652
653 /**
654 * Update consumer configuration with new properties.
655 *
656 * @deprecated as of version 2.13. Use {@link #withConsumerConfigUpdates(Map)} instead
657 */
658 @Deprecated
659 public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
660 Map<String, Object> config =
661 updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates);
662 return toBuilder().setConsumerConfig(config).build();
663 }
664
665 /**
666 * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxNumRecords(long)}. Mainly used
667 * for tests and demo applications.
668 */
669 public Read<K, V> withMaxNumRecords(long maxNumRecords) {
670 return toBuilder().setMaxNumRecords(maxNumRecords).build();
671 }
672
673 /**
674 * Use timestamp to set up start offset. It is only supported by Kafka Client 0.10.1.0 onwards
675 * and the message format version after 0.10.0.
676 *
677 * <p>Note that this take priority over start offset configuration {@code
678 * ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.
679 *
680 * <p>This results in hard failures in either of the following two cases : 1. If one of more
681 * partitions do not contain any messages with timestamp larger than or equal to desired
682 * timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the
683 * messages do not have timestamps.
684 */
685 public Read<K, V> withStartReadTime(Instant startReadTime) {
686 return toBuilder().setStartReadTime(startReadTime).build();
687 }
688
689 /**
690 * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}. Mainly
691 * used for tests and demo applications.
692 */
693 public Read<K, V> withMaxReadTime(Duration maxReadTime) {
694 return toBuilder().setMaxReadTime(maxReadTime).build();
695 }
696
697 /**
698 * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.LogAppendTimePolicy}. The
699 * policy assigns Kafka's log append time (server side ingestion time) to each record. The
700 * watermark for each Kafka partition is the timestamp of the last record read. If a partition
701 * is idle, the watermark advances to couple of seconds behind wall time. Every record consumed
702 * from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.
703 *
704 * <p>In Kafka, log append time needs to be enabled for each topic, and all the subsequent
705 * records wil have their timestamp set to log append time. If a record does not have its
706 * timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous
707 * record timestamp or latest watermark, whichever is larger.
708 *
709 * <p>The watermark for the entire source is the oldest of each partition's watermark. If one of
710 * the readers falls behind possibly due to uneven distribution of records among Kafka
711 * partitions, it ends up holding the watermark for the entire source.
712 */
713 public Read<K, V> withLogAppendTime() {
714 return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
715 }
716
717 /**
718 * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.ProcessingTimePolicy}. This is
719 * the default timestamp policy. It assigns processing time to each record. Specifically, this
720 * is the timestamp when the record becomes 'current' in the reader. The watermark aways
721 * advances to current time. If server side time (log append time) is enabled in Kafka, {@link
722 * #withLogAppendTime()} is recommended over this.
723 */
724 public Read<K, V> withProcessingTime() {
725 return withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
726 }
727
728 /**
729 * Sets the timestamps policy based on {@link KafkaTimestampType#CREATE_TIME} timestamp of the
730 * records. It is an error if a record's timestamp type is not {@link
731 * KafkaTimestampType#CREATE_TIME}. The timestamps within a partition are expected to be roughly
732 * monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute).
733 * The watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.
734 * However, watermark is never set in future and capped to 'now - max delay'. In addition,
735 * watermark advanced to 'now - max delay' when a partition is idle.
736 *
737 * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent record
738 * is expected to be after {@code current record timestamp - maxDelay}.
739 */
740 public Read<K, V> withCreateTime(Duration maxDelay) {
741 return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
742 }
743
744 /**
745 * Provide custom {@link TimestampPolicyFactory} to set event times and watermark for each
746 * partition. {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} is
747 * invoked for each partition when the reader starts.
748 *
749 * @see #withLogAppendTime()
750 * @see #withCreateTime(Duration)
751 * @see #withProcessingTime()
752 */
753 public Read<K, V> withTimestampPolicyFactory(
754 TimestampPolicyFactory<K, V> timestampPolicyFactory) {
755 return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
756 }
757
758 /**
759 * A function to assign a timestamp to a record. Default is processing timestamp.
760 *
761 * @deprecated as of version 2.4. Use {@link
762 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
763 */
764 @Deprecated
765 public Read<K, V> withTimestampFn2(
766 SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
767 checkArgument(timestampFn != null, "timestampFn can not be null");
768 return toBuilder()
769 .setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn))
770 .build();
771 }
772
773 /**
774 * A function to calculate watermark after a record. Default is last record timestamp.
775 *
776 * @see #withTimestampFn(SerializableFunction)
777 * @deprecated as of version 2.4. Use {@link
778 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
779 */
780 @Deprecated
781 public Read<K, V> withWatermarkFn2(
782 SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
783 checkArgument(watermarkFn != null, "watermarkFn can not be null");
784 return toBuilder().setWatermarkFn(watermarkFn).build();
785 }
786
787 /**
788 * A function to assign a timestamp to a record. Default is processing timestamp.
789 *
790 * @deprecated as of version 2.4. Use {@link
791 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
792 */
793 @Deprecated
794 public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
795 checkArgument(timestampFn != null, "timestampFn can not be null");
796 return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
797 }
798
799 /**
800 * A function to calculate watermark after a record. Default is last record timestamp.
801 *
802 * @see #withTimestampFn(SerializableFunction)
803 * @deprecated as of version 2.4. Use {@link
804 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
805 */
806 @Deprecated
807 public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
808 checkArgument(watermarkFn != null, "watermarkFn can not be null");
809 return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
810 }
811
812 /**
813 * Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is ensures
814 * that the consumer does not read uncommitted messages. Kafka version 0.11 introduced
815 * transactional writes. Applications requiring end-to-end exactly-once semantics should only
816 * read committed messages. See JavaDoc for {@link KafkaConsumer} for more description.
817 */
818 public Read<K, V> withReadCommitted() {
819 return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
820 }
821
822 /**
823 * Finalized offsets are committed to Kafka. See {@link CheckpointMark#finalizeCheckpoint()}. It
824 * helps with minimizing gaps or duplicate processing of records while restarting a pipeline
825 * from scratch. But it does not provide hard processing guarantees. There could be a short
826 * delay to commit after {@link CheckpointMark#finalizeCheckpoint()} is invoked, as reader might
827 * be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer
828 * configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both.
829 */
830 public Read<K, V> commitOffsetsInFinalize() {
831 return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
832 }
833
834 /**
835 * Set additional configuration for the backend offset consumer. It may be required for a
836 * secured Kafka cluster, especially when you see similar WARN log message 'exception while
837 * fetching latest offset for partition {}. will be retried'.
838 *
839 * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
840 * 1. the main consumer, which reads data from kafka;<br>
841 * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
842 * offset;<br>
843 *
844 * <p>By default, offset consumer inherits the configuration from main consumer, with an
845 * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka
846 * which requires more configurations.
847 */
848 public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
849 return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
850 }
851
852 /**
853 * Update configuration for the backend main consumer. Note that the default consumer properties
854 * will not be completely overridden. This method only updates the value which has the same key.
855 *
856 * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
857 * 1. the main consumer, which reads data from kafka;<br>
858 * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
859 * offset;<br>
860 *
861 * <p>By default, main consumer uses the configuration from {@link
862 * #DEFAULT_CONSUMER_PROPERTIES}.
863 */
864 public Read<K, V> withConsumerConfigUpdates(Map<String, Object> configUpdates) {
865 Map<String, Object> config =
866 updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates);
867 return toBuilder().setConsumerConfig(config).build();
868 }
869
870 /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
871 public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
872 return new TypedWithoutMetadata<>(this);
873 }
874
875 @Override
876 public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
877 checkArgument(
878 getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
879 "withBootstrapServers() is required");
880 checkArgument(
881 getTopics().size() > 0 || getTopicPartitions().size() > 0,
882 "Either withTopic(), withTopics() or withTopicPartitions() is required");
883 checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
884 checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");
885
886 ConsumerSpEL consumerSpEL = new ConsumerSpEL();
887 if (!consumerSpEL.hasOffsetsForTimes()) {
888 LOG.warn(
889 "Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and "
890 + "may not be supported in next release of Apache Beam. "
891 + "Please upgrade your Kafka client version.",
892 AppInfoParser.getVersion());
893 }
894 if (getStartReadTime() != null) {
895 checkArgument(
896 consumerSpEL.hasOffsetsForTimes(),
897 "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
898 + "current version of Kafka Client is "
899 + AppInfoParser.getVersion()
900 + ". If you are building with maven, set \"kafka.clients.version\" "
901 + "maven property to 0.10.1.0 or newer.");
902 }
903 if (isCommitOffsetsInFinalizeEnabled()) {
904 checkArgument(
905 getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null,
906 "commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config "
907 + "is not set. Offset management requires group.id.");
908 if (Boolean.TRUE.equals(
909 getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
910 LOG.warn(
911 "'{}' in consumer config is enabled even though commitOffsetsInFinalize() "
912 + "is set. You need only one of them.",
913 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
914 }
915 }
916
917 // Infer key/value coders if not specified explicitly
918 CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
919
920 Coder<K> keyCoder = getKeyCoder(coderRegistry);
921 Coder<V> valueCoder = getValueCoder(coderRegistry);
922
923 // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
924 Unbounded<KafkaRecord<K, V>> unbounded =
925 org.apache.beam.sdk.io.Read.from(
926 toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
927
928 PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
929
930 if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
931 transform =
932 unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
933 }
934
935 return input.getPipeline().apply(transform);
936 }
937
938 private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
939 return (getKeyCoder() != null)
940 ? getKeyCoder()
941 : getKeyDeserializerProvider().getCoder(coderRegistry);
942 }
943
944 private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
945 return (getValueCoder() != null)
946 ? getValueCoder()
947 : getValueDeserializerProvider().getCoder(coderRegistry);
948 }
949
950 /**
951 * Creates an {@link UnboundedSource UnboundedSource<KafkaRecord<K, V>, ?>} with the
952 * configuration in {@link Read}. Primary use case is unit tests, should not be used in an
953 * application.
954 */
955 @VisibleForTesting
956 UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
957 return new KafkaUnboundedSource<>(this, -1);
958 }
959
960 // utility method to convert KafkaRecord<K, V> to user KV<K, V> before applying user functions
961 private static <KeyT, ValueT, OutT>
962 SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(
963 final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
964 return record -> fn.apply(record.getKV());
965 }
966 ///////////////////////////////////////////////////////////////////////////////////////
967
968 /** A set of properties that are not required or don't make sense for our consumer. */
969 private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES =
970 ImmutableMap.of(
971 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",
972 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"
973 // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
974 // lets allow these, applications can have better resume point for restarts.
975 );
976
977 // set config defaults
978 private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
979 ImmutableMap.of(
980 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
981 ByteArrayDeserializer.class.getName(),
982 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
983 ByteArrayDeserializer.class.getName(),
984
985 // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.
986 // with default value of of 32K, It takes multiple seconds between successful polls.
987 // All the consumer work is done inside poll(), with smaller send buffer size, it
988 // takes many polls before a 1MB chunk from the server is fully read. In my testing
989 // about half of the time select() inside kafka consumer waited for 20-30ms, though
990 // the server had lots of data in tcp send buffers on its side. Compared to default,
991 // this setting increased throughput by many fold (3-4x).
992 ConsumerConfig.RECEIVE_BUFFER_CONFIG,
993 512 * 1024,
994
995 // default to latest offset when we are not resuming.
996 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
997 "latest",
998 // disable auto commit of offsets. we don't require group_id. could be enabled by user.
999 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
1000 false);
1001
1002 // default Kafka 0.9 Consumer supplier.
1003 private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
1004 KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new;
1005
1006 @SuppressWarnings("unchecked")
1007 @Override
1008 public void populateDisplayData(DisplayData.Builder builder) {
1009 super.populateDisplayData(builder);
1010 List<String> topics = getTopics();
1011 List<TopicPartition> topicPartitions = getTopicPartitions();
1012 if (topics.size() > 0) {
1013 builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
1014 } else if (topicPartitions.size() > 0) {
1015 builder.add(
1016 DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
1017 .withLabel("Topic Partition/s"));
1018 }
1019 Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet();
1020 for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
1021 String key = conf.getKey();
1022 if (!ignoredConsumerPropertiesKeys.contains(key)) {
1023 Object value =
1024 DisplayData.inferType(conf.getValue()) != null
1025 ? conf.getValue()
1026 : String.valueOf(conf.getValue());
1027 builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
1028 }
1029 }
1030 }
1031 }
1032
1033 /**
1034 * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but removes
1035 * Kafka metatdata and returns a {@link PCollection} of {@link KV}. See {@link KafkaIO} for more
1036 * information on usage and configuration of reader.
1037 */
1038 public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
1039 private final Read<K, V> read;
1040
1041 TypedWithoutMetadata(Read<K, V> read) {
1042 super("KafkaIO.Read");
1043 this.read = read;
1044 }
1045
1046 @Override
1047 public PCollection<KV<K, V>> expand(PBegin begin) {
1048 return begin
1049 .apply(read)
1050 .apply(
1051 "Remove Kafka Metadata",
1052 ParDo.of(
1053 new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
1054 @ProcessElement
1055 public void processElement(ProcessContext ctx) {
1056 ctx.output(ctx.element().getKV());
1057 }
1058 }));
1059 }
1060
1061 @Override
1062 public void populateDisplayData(DisplayData.Builder builder) {
1063 super.populateDisplayData(builder);
1064 read.populateDisplayData(builder);
1065 }
1066 }
1067
1068 ////////////////////////////////////////////////////////////////////////////////////////////////
1069
1070 private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
1071
1072 /**
1073 * Returns a new config map which is merge of current config and updates. Verifies the updates do
1074 * not includes ignored properties.
1075 */
1076 private static Map<String, Object> updateKafkaProperties(
1077 Map<String, Object> currentConfig,
1078 Map<String, String> ignoredProperties,
1079 Map<String, Object> updates) {
1080
1081 for (String key : updates.keySet()) {
1082 checkArgument(
1083 !ignoredProperties.containsKey(key),
1084 "No need to configure '%s'. %s",
1085 key,
1086 ignoredProperties.get(key));
1087 }
1088
1089 Map<String, Object> config = new HashMap<>(currentConfig);
1090 config.putAll(updates);
1091
1092 return config;
1093 }
1094
1095 /** Static class, prevent instantiation. */
1096 private KafkaIO() {}
1097
1098 //////////////////////// Sink Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
1099
1100 /**
1101 * A {@link PTransform} to write to a Kafka topic with ProducerRecord's. See {@link KafkaIO} for
1102 * more information on usage and configuration.
1103 */
1104 @AutoValue
1105 public abstract static class WriteRecords<K, V>
1106 extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
1107 // TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be
1108 // parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,
1109 // we shouldn't have to duplicate the same API for similar transforms like {@link Write} and
1110 // {@link WriteRecords}. See example at {@link PubsubIO.Write}.
1111
1112 @Nullable
1113 abstract String getTopic();
1114
1115 abstract Map<String, Object> getProducerConfig();
1116
1117 @Nullable
1118 abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();
1119
1120 @Nullable
1121 abstract Class<? extends Serializer<K>> getKeySerializer();
1122
1123 @Nullable
1124 abstract Class<? extends Serializer<V>> getValueSerializer();
1125
1126 @Nullable
1127 abstract KafkaPublishTimestampFunction<ProducerRecord<K, V>> getPublishTimestampFunction();
1128
1129 // Configuration for EOS sink
1130 abstract boolean isEOS();
1131
1132 @Nullable
1133 abstract String getSinkGroupId();
1134
1135 abstract int getNumShards();
1136
1137 @Nullable
1138 abstract SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>>
1139 getConsumerFactoryFn();
1140
1141 abstract Builder<K, V> toBuilder();
1142
1143 @AutoValue.Builder
1144 abstract static class Builder<K, V> {
1145 abstract Builder<K, V> setTopic(String topic);
1146
1147 abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
1148
1149 abstract Builder<K, V> setProducerFactoryFn(
1150 SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
1151
1152 abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer);
1153
1154 abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer);
1155
1156 abstract Builder<K, V> setPublishTimestampFunction(
1157 KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction);
1158
1159 abstract Builder<K, V> setEOS(boolean eosEnabled);
1160
1161 abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
1162
1163 abstract Builder<K, V> setNumShards(int numShards);
1164
1165 abstract Builder<K, V> setConsumerFactoryFn(
1166 SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> fn);
1167
1168 abstract WriteRecords<K, V> build();
1169 }
1170
1171 /**
1172 * Returns a new {@link Write} transform with Kafka producer pointing to {@code
1173 * bootstrapServers}.
1174 */
1175 public WriteRecords<K, V> withBootstrapServers(String bootstrapServers) {
1176 return withProducerConfigUpdates(
1177 ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
1178 }
1179
1180 /**
1181 * Sets the default Kafka topic to write to. Use {@code ProducerRecords} to set topic name per
1182 * published record.
1183 */
1184 public WriteRecords<K, V> withTopic(String topic) {
1185 return toBuilder().setTopic(topic).build();
1186 }
1187
1188 /**
1189 * Sets a {@link Serializer} for serializing key (if any) to bytes.
1190 *
1191 * <p>A key is optional while writing to Kafka. Note when a key is set, its hash is used to
1192 * determine partition in Kafka (see {@link ProducerRecord} for more details).
1193 */
1194 public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
1195 return toBuilder().setKeySerializer(keySerializer).build();
1196 }
1197
1198 /** Sets a {@link Serializer} for serializing value to bytes. */
1199 public WriteRecords<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
1200 return toBuilder().setValueSerializer(valueSerializer).build();
1201 }
1202
1203 /**
1204 * Adds the given producer properties, overriding old values of properties with the same key.
1205 *
1206 * @deprecated as of version 2.13. Use {@link #withProducerConfigUpdates(Map)} instead.
1207 */
1208 @Deprecated
1209 public WriteRecords<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
1210 Map<String, Object> config =
1211 updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, configUpdates);
1212 return toBuilder().setProducerConfig(config).build();
1213 }
1214
1215 /**
1216 * Update configuration for the producer. Note that the default producer properties will not be
1217 * completely overridden. This method only updates the value which has the same key.
1218 *
1219 * <p>By default, the producer uses the configuration from {@link #DEFAULT_PRODUCER_PROPERTIES}.
1220 */
1221 public WriteRecords<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
1222 Map<String, Object> config =
1223 updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, configUpdates);
1224 return toBuilder().setProducerConfig(config).build();
1225 }
1226
1227 /**
1228 * Sets a custom function to create Kafka producer. Primarily used for tests. Default is {@link
1229 * KafkaProducer}
1230 */
1231 public WriteRecords<K, V> withProducerFactoryFn(
1232 SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
1233 return toBuilder().setProducerFactoryFn(producerFactoryFn).build();
1234 }
1235
1236 /**
1237 * The timestamp for each record being published is set to timestamp of the element in the
1238 * pipeline. This is equivalent to {@code withPublishTimestampFunction((e, ts) -> ts)}. <br>
1239 * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is
1240 * processing messages from the past, they might be deleted immediately by Kafka after being
1241 * published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.
1242 */
1243 public WriteRecords<K, V> withInputTimestamp() {
1244 return withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
1245 }
1246
1247 /**
1248 * A function to provide timestamp for records being published. <br>
1249 * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is
1250 * processing messages from the past, they might be deleted immediately by Kafka after being
1251 * published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.
1252 *
1253 * @deprecated use {@code ProducerRecords} to set publish timestamp.
1254 */
1255 @Deprecated
1256 public WriteRecords<K, V> withPublishTimestampFunction(
1257 KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) {
1258 return toBuilder().setPublishTimestampFunction(timestampFunction).build();
1259 }
1260
1261 /**
1262 * Provides exactly-once semantics while writing to Kafka, which enables applications with
1263 * end-to-end exactly-once guarantees on top of exactly-once semantics <i>within</i> Beam
1264 * pipelines. It ensures that records written to sink are committed on Kafka exactly once, even
1265 * in the case of retries during pipeline execution even when some processing is retried.
1266 * Retries typically occur when workers restart (as in failure recovery), or when the work is
1267 * redistributed (as in an autoscaling event).
1268 *
1269 * <p>Beam runners typically provide exactly-once semantics for results of a pipeline, but not
1270 * for side effects from user code in transform. If a transform such as Kafka sink writes to an
1271 * external system, those writes might occur more than once. When EOS is enabled here, the sink
1272 * transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka
1273 * (version 0.11+) to ensure a record is written only once. As the implementation relies on
1274 * runners checkpoint semantics, not all the runners are compatible. The sink throws an
1275 * exception during initialization if the runner is not explicitly allowed. Flink runner is one
1276 * of the runners whose checkpoint semantics are not compatible with current implementation
1277 * (hope to provide a solution in near future). Dataflow runner and Spark runners are
1278 * compatible.
1279 *
1280 * <p>Note on performance: Exactly-once sink involves two shuffles of the records. In addition
1281 * to cost of shuffling the records among workers, the records go through 2
1282 * serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU
1283 * cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e.
1284 * serializing them to byte before writing to Kafka sink).
1285 *
1286 * @param numShards Sets sink parallelism. The state metadata stored on Kafka is stored across
1287 * this many virtual partitions using {@code sinkGroupId}. A good rule of thumb is to set
1288 * this to be around number of partitions in Kafka topic.
1289 * @param sinkGroupId The <i>group id</i> used to store small amount of state as metadata on
1290 * Kafka. It is similar to <i>consumer group id</i> used with a {@link KafkaConsumer}. Each
1291 * job should use a unique group id so that restarts/updates of job preserve the state to
1292 * ensure exactly-once semantics. The state is committed atomically with sink transactions
1293 * on Kafka. See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)} for more
1294 * information. The sink performs multiple sanity checks during initialization to catch
1295 * common mistakes so that it does not end up using state that does not <i>seem</i> to be
1296 * written by the same job.
1297 */
1298 public WriteRecords<K, V> withEOS(int numShards, String sinkGroupId) {
1299 KafkaExactlyOnceSink.ensureEOSSupport();
1300 checkArgument(numShards >= 1, "numShards should be >= 1");
1301 checkArgument(sinkGroupId != null, "sinkGroupId is required for exactly-once sink");
1302 return toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build();
1303 }
1304
1305 /**
1306 * When exactly-once semantics are enabled (see {@link #withEOS(int, String)}), the sink needs
1307 * to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer.
1308 * Similar to {@link Read#withConsumerFactoryFn(SerializableFunction)}, a factory function can
1309 * be supplied if required in a specific case. The default is {@link KafkaConsumer}.
1310 */
1311 public WriteRecords<K, V> withConsumerFactoryFn(
1312 SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
1313 return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
1314 }
1315
1316 @Override
1317 public PDone expand(PCollection<ProducerRecord<K, V>> input) {
1318 checkArgument(
1319 getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
1320 "withBootstrapServers() is required");
1321
1322 checkArgument(getKeySerializer() != null, "withKeySerializer() is required");
1323 checkArgument(getValueSerializer() != null, "withValueSerializer() is required");
1324
1325 if (isEOS()) {
1326 checkArgument(getTopic() != null, "withTopic() is required when isEOS() is true");
1327 KafkaExactlyOnceSink.ensureEOSSupport();
1328
1329 // TODO: Verify that the group_id does not have existing state stored on Kafka unless
1330 // this is an upgrade. This avoids issues with simple mistake of reusing group_id
1331 // across multiple runs or across multiple jobs. This is checked when the sink
1332 // transform initializes while processing the output. It might be better to
1333 // check here to catch common mistake.
1334
1335 input.apply(new KafkaExactlyOnceSink<>(this));
1336 } else {
1337 input.apply(ParDo.of(new KafkaWriter<>(this)));
1338 }
1339 return PDone.in(input.getPipeline());
1340 }
1341
1342 @Override
1343 public void validate(PipelineOptions options) {
1344 if (isEOS()) {
1345 String runner = options.getRunner().getName();
1346 if ("org.apache.beam.runners.direct.DirectRunner".equals(runner)
1347 || runner.startsWith("org.apache.beam.runners.dataflow.")
1348 || runner.startsWith("org.apache.beam.runners.spark.")
1349 || runner.startsWith("org.apache.beam.runners.flink.")) {
1350 return;
1351 }
1352 throw new UnsupportedOperationException(
1353 runner
1354 + " is not a runner known to be compatible with Kafka exactly-once sink. "
1355 + "This implementation of exactly-once sink relies on specific checkpoint guarantees. "
1356 + "Only the runners with known to have compatible checkpoint semantics are allowed.");
1357 }
1358 }
1359
1360 // set config defaults
1361 private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
1362 ImmutableMap.of(ProducerConfig.RETRIES_CONFIG, 3);
1363
1364 /** A set of properties that are not required or don't make sense for our producer. */
1365 private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES =
1366 ImmutableMap.of(
1367 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
1368 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead");
1369
1370 @Override
1371 public void populateDisplayData(DisplayData.Builder builder) {
1372 super.populateDisplayData(builder);
1373 builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
1374 Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
1375 for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {
1376 String key = conf.getKey();
1377 if (!ignoredProducerPropertiesKeys.contains(key)) {
1378 Object value =
1379 DisplayData.inferType(conf.getValue()) != null
1380 ? conf.getValue()
1381 : String.valueOf(conf.getValue());
1382 builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
1383 }
1384 }
1385 }
1386 }
1387
1388 /**
1389 * A {@link PTransform} to write to a Kafka topic with KVs . See {@link KafkaIO} for more
1390 * information on usage and configuration.
1391 */
1392 @AutoValue
1393 public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
1394 // TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be
1395 // parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,
1396 // we shouldn't have to duplicate the same API for similar transforms like {@link Write} and
1397 // {@link WriteRecords}. See example at {@link PubsubIO.Write}.
1398
1399 @Nullable
1400 abstract String getTopic();
1401
1402 abstract WriteRecords<K, V> getWriteRecordsTransform();
1403
1404 abstract Builder<K, V> toBuilder();
1405
1406 @Experimental(Kind.PORTABILITY)
1407 @AutoValue.Builder
1408 abstract static class Builder<K, V>
1409 implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {
1410 abstract Builder<K, V> setTopic(String topic);
1411
1412 abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> transform);
1413
1414 abstract Write<K, V> build();
1415
1416 @Override
1417 public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
1418 External.Configuration configuration) {
1419 setTopic(configuration.topic);
1420
1421 Map<String, Object> producerConfig = new HashMap<>();
1422 for (KV<String, String> kv : configuration.producerConfig) {
1423 producerConfig.put(kv.getKey(), kv.getValue());
1424 }
1425 Class keySerializer = resolveClass(configuration.keySerializer);
1426 Class valSerializer = resolveClass(configuration.valueSerializer);
1427
1428 WriteRecords<K, V> writeRecords =
1429 KafkaIO.<K, V>writeRecords()
1430 .withProducerConfigUpdates(producerConfig)
1431 .withKeySerializer(keySerializer)
1432 .withValueSerializer(valSerializer)
1433 .withTopic(configuration.topic);
1434 setWriteRecordsTransform(writeRecords);
1435
1436 return build();
1437 }
1438 }
1439
1440 /** Exposes {@link KafkaIO.Write} as an external transform for cross-language usage. */
1441 @Experimental(Kind.PORTABILITY)
1442 @AutoService(ExternalTransformRegistrar.class)
1443 public static class External implements ExternalTransformRegistrar {
1444
1445 public static final String URN = "beam:external:java:kafka:write:v1";
1446
1447 @Override
1448 public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
1449 return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class);
1450 }
1451
1452 /** Parameters class to expose the Write transform to an external SDK. */
1453 public static class Configuration {
1454
1455 // All byte arrays are UTF-8 encoded strings
1456 private Iterable<KV<String, String>> producerConfig;
1457 private String topic;
1458 private String keySerializer;
1459 private String valueSerializer;
1460
1461 public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
1462 this.producerConfig = producerConfig;
1463 }
1464
1465 public void setTopic(String topic) {
1466 this.topic = topic;
1467 }
1468
1469 public void setKeySerializer(String keySerializer) {
1470 this.keySerializer = keySerializer;
1471 }
1472
1473 public void setValueSerializer(String valueSerializer) {
1474 this.valueSerializer = valueSerializer;
1475 }
1476 }
1477 }
1478
1479 /** Used mostly to reduce using of boilerplate of wrapping {@link WriteRecords} methods. */
1480 private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> transform) {
1481 return toBuilder().setWriteRecordsTransform(transform).build();
1482 }
1483
1484 /**
1485 * Wrapper method over {@link WriteRecords#withBootstrapServers(String)}, used to keep the
1486 * compatibility with old API based on KV type of element.
1487 */
1488 public Write<K, V> withBootstrapServers(String bootstrapServers) {
1489 return withWriteRecordsTransform(
1490 getWriteRecordsTransform().withBootstrapServers(bootstrapServers));
1491 }
1492
1493 /**
1494 * Wrapper method over {@link WriteRecords#withTopic(String)}, used to keep the compatibility
1495 * with old API based on KV type of element.
1496 */
1497 public Write<K, V> withTopic(String topic) {
1498 return toBuilder()
1499 .setTopic(topic)
1500 .setWriteRecordsTransform(getWriteRecordsTransform().withTopic(topic))
1501 .build();
1502 }
1503
1504 /**
1505 * Wrapper method over {@link WriteRecords#withKeySerializer(Class)}, used to keep the
1506 * compatibility with old API based on KV type of element.
1507 */
1508 public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
1509 return withWriteRecordsTransform(getWriteRecordsTransform().withKeySerializer(keySerializer));
1510 }
1511
1512 /**
1513 * Wrapper method over {@link WriteRecords#withValueSerializer(Class)}, used to keep the
1514 * compatibility with old API based on KV type of element.
1515 */
1516 public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
1517 return withWriteRecordsTransform(
1518 getWriteRecordsTransform().withValueSerializer(valueSerializer));
1519 }
1520
1521 /**
1522 * Wrapper method over {@link WriteRecords#withProducerFactoryFn(SerializableFunction)}, used to
1523 * keep the compatibility with old API based on KV type of element.
1524 */
1525 public Write<K, V> withProducerFactoryFn(
1526 SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
1527 return withWriteRecordsTransform(
1528 getWriteRecordsTransform().withProducerFactoryFn(producerFactoryFn));
1529 }
1530
1531 /**
1532 * Wrapper method over {@link WriteRecords#withInputTimestamp()}, used to keep the compatibility
1533 * with old API based on KV type of element.
1534 */
1535 public Write<K, V> withInputTimestamp() {
1536 return withWriteRecordsTransform(getWriteRecordsTransform().withInputTimestamp());
1537 }
1538
1539 /**
1540 * Wrapper method over {@link
1541 * WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}, used to keep the
1542 * compatibility with old API based on KV type of element.
1543 *
1544 * @deprecated use {@link WriteRecords} and {@code ProducerRecords} to set publish timestamp.
1545 */
1546 @Deprecated
1547 @SuppressWarnings({"unchecked", "rawtypes"})
1548 public Write<K, V> withPublishTimestampFunction(
1549 KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
1550 return withWriteRecordsTransform(
1551 getWriteRecordsTransform()
1552 .withPublishTimestampFunction(new PublishTimestampFunctionKV(timestampFunction)));
1553 }
1554
1555 /**
1556 * Wrapper method over {@link WriteRecords#withEOS(int, String)}, used to keep the compatibility
1557 * with old API based on KV type of element.
1558 */
1559 public Write<K, V> withEOS(int numShards, String sinkGroupId) {
1560 return withWriteRecordsTransform(getWriteRecordsTransform().withEOS(numShards, sinkGroupId));
1561 }
1562
1563 /**
1564 * Wrapper method over {@link WriteRecords#withConsumerFactoryFn(SerializableFunction)}, used to
1565 * keep the compatibility with old API based on KV type of element.
1566 */
1567 public Write<K, V> withConsumerFactoryFn(
1568 SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
1569 return withWriteRecordsTransform(
1570 getWriteRecordsTransform().withConsumerFactoryFn(consumerFactoryFn));
1571 }
1572
1573 /**
1574 * Adds the given producer properties, overriding old values of properties with the same key.
1575 *
1576 * @deprecated as of version 2.13. Use {@link #withProducerConfigUpdates(Map)} instead.
1577 */
1578 @Deprecated
1579 public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
1580 return withWriteRecordsTransform(
1581 getWriteRecordsTransform().updateProducerProperties(configUpdates));
1582 }
1583
1584 /**
1585 * Update configuration for the producer. Note that the default producer properties will not be
1586 * completely overridden. This method only updates the value which has the same key.
1587 *
1588 * <p>By default, the producer uses the configuration from {@link
1589 * WriteRecords#DEFAULT_PRODUCER_PROPERTIES}.
1590 */
1591 public Write<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
1592 return withWriteRecordsTransform(
1593 getWriteRecordsTransform().withProducerConfigUpdates(configUpdates));
1594 }
1595
1596 @Override
1597 public PDone expand(PCollection<KV<K, V>> input) {
1598 checkArgument(getTopic() != null, "withTopic() is required");
1599
1600 KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
1601 return input
1602 .apply(
1603 "Kafka ProducerRecord",
1604 MapElements.via(
1605 new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>() {
1606 @Override
1607 public ProducerRecord<K, V> apply(KV<K, V> element) {
1608 return new ProducerRecord<>(getTopic(), element.getKey(), element.getValue());
1609 }
1610 }))
1611 .setCoder(ProducerRecordCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()))
1612 .apply(getWriteRecordsTransform());
1613 }
1614
1615 @Override
1616 public void validate(PipelineOptions options) {
1617 getWriteRecordsTransform().validate(options);
1618 }
1619
1620 @Override
1621 public void populateDisplayData(DisplayData.Builder builder) {
1622 super.populateDisplayData(builder);
1623 getWriteRecordsTransform().populateDisplayData(builder);
1624 }
1625
1626 /**
1627 * Writes just the values to Kafka. This is useful for writing collections of values rather
1628 * thank {@link KV}s.
1629 */
1630 @SuppressWarnings({"unchecked", "rawtypes"})
1631 public PTransform<PCollection<V>, PDone> values() {
1632 return new KafkaValueWrite<K, V>(this.withKeySerializer((Class) StringSerializer.class));
1633 }
1634
1635 /**
1636 * Wrapper class which allows to use {@code KafkaPublishTimestampFunction<KV<K, V>} with {@link
1637 * WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}.
1638 */
1639 private static class PublishTimestampFunctionKV<K, V>
1640 implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
1641
1642 private KafkaPublishTimestampFunction<KV<K, V>> fn;
1643
1644 public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, V>> fn) {
1645 this.fn = fn;
1646 }
1647
1648 @Override
1649 public Instant getTimestamp(ProducerRecord<K, V> e, Instant ts) {
1650 return fn.getTimestamp(KV.of(e.key(), e.value()), ts);
1651 }
1652 }
1653 }
1654
1655 /**
1656 * Same as {@code Write<K, V>} without a Key. Null is used for key as it is the convention is
1657 * Kafka when there is no key specified. Majority of Kafka writers don't specify a key.
1658 */
1659 private static class KafkaValueWrite<K, V> extends PTransform<PCollection<V>, PDone> {
1660 private final Write<K, V> kvWriteTransform;
1661
1662 private KafkaValueWrite(Write<K, V> kvWriteTransform) {
1663 this.kvWriteTransform = kvWriteTransform;
1664 }
1665
1666 @Override
1667 public PDone expand(PCollection<V> input) {
1668 return input
1669 .apply(
1670 "Kafka values with default key",
1671 MapElements.via(
1672 new SimpleFunction<V, KV<K, V>>() {
1673 @Override
1674 public KV<K, V> apply(V element) {
1675 return KV.of(null, element);
1676 }
1677 }))
1678 .setCoder(KvCoder.of(new NullOnlyCoder<>(), input.getCoder()))
1679 .apply(kvWriteTransform);
1680 }
1681
1682 @Override
1683 public void populateDisplayData(DisplayData.Builder builder) {
1684 super.populateDisplayData(builder);
1685 kvWriteTransform.populateDisplayData(builder);
1686 }
1687
1688 @Experimental(Kind.PORTABILITY)
1689 public static class Builder<K, V>
1690 implements ExternalTransformBuilder<KafkaValueWrite.External.Configuration, PCollection<V>, PDone> {
1691
1692 @Override
1693 public PTransform<PCollection<V>, PDone> buildExternal(
1694 KafkaValueWrite.External.Configuration configuration) {
1695
1696 Map<String, Object> producerConfig = new HashMap<>();
1697 for (KV<String, String> kv : configuration.producerConfig) {
1698 producerConfig.put(kv.getKey(), kv.getValue());
1699 }
1700
1701 Class valSerializer = resolveClass(configuration.valueSerializer);
1702
1703 Write<K, V> writeTransform =
1704 KafkaIO.<K, V>write()
1705 .withProducerConfigUpdates(producerConfig)
1706 .withKeySerializer(valSerializer)
1707 .withValueSerializer(valSerializer)
1708 .withTopic(configuration.topic);
1709
1710 return new KafkaIO.KafkaValueWrite<K, V>(writeTransform);
1711 }
1712 }
1713
1714 /** Exposes {@link KafkaIO.Write} as an external transform for cross-language usage. */
1715 @Experimental(Kind.PORTABILITY)
1716 @AutoService(ExternalTransformRegistrar.class)
1717 public static class External implements ExternalTransformRegistrar {
1718
1719 public static final String URN = "beam:external:java:kafka:write_values:v1";
1720
1721 @Override
1722 public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
1723 return ImmutableMap.of(URN, KafkaValueWrite.Builder.class);
1724 }
1725
1726 /** Parameters class to expose the Write transform to an external SDK. */
1727 public static class Configuration {
1728
1729 // All byte arrays are UTF-8 encoded strings
1730 private Iterable<KV<String, String>> producerConfig;
1731 private String topic;
1732 private String valueSerializer;
1733
1734 public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
1735 this.producerConfig = producerConfig;
1736 }
1737
1738 public void setTopic(String topic) {
1739 this.topic = topic;
1740 }
1741
1742 public void setValueSerializer(String valueSerializer) {
1743 this.valueSerializer = valueSerializer;
1744 }
1745 }
1746 }
1747 }
1748
1749
1750 private static class NullOnlyCoder<T> extends AtomicCoder<T> {
1751 @Override
1752 public void encode(T value, OutputStream outStream) {
1753 checkArgument(value == null, "Can only encode nulls");
1754 // Encode as no bytes.
1755 }
1756
1757 @Override
1758 public T decode(InputStream inStream) {
1759 return null;
1760 }
1761 }
1762
1763 private static Class resolveClass(String className) {
1764 try {
1765 return Class.forName(className);
1766 } catch (ClassNotFoundException e) {
1767 throw new RuntimeException("Could not find class: " + className);
1768 }
1769 }
1770}