· 5 years ago · Jul 20, 2020, 02:00 PM
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.beam.sdk.io.kafka;
19
20import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
21import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
22
23import com.google.auto.service.AutoService;
24import com.google.auto.value.AutoValue;
25import io.confluent.kafka.serializers.KafkaAvroDeserializer;
26import java.io.InputStream;
27import java.io.OutputStream;
28import java.lang.reflect.Method;
29import java.util.ArrayList;
30import java.util.Collections;
31import java.util.HashMap;
32import java.util.List;
33import java.util.Map;
34import java.util.Optional;
35import java.util.Set;
36import javax.annotation.Nullable;
37import org.apache.beam.sdk.annotations.Experimental;
38import org.apache.beam.sdk.annotations.Experimental.Kind;
39import org.apache.beam.sdk.coders.AtomicCoder;
40import org.apache.beam.sdk.coders.AvroCoder;
41import org.apache.beam.sdk.coders.ByteArrayCoder;
42import org.apache.beam.sdk.coders.Coder;
43import org.apache.beam.sdk.coders.CoderRegistry;
44import org.apache.beam.sdk.coders.KvCoder;
45import org.apache.beam.sdk.coders.VarIntCoder;
46import org.apache.beam.sdk.coders.VarLongCoder;
47import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
48import org.apache.beam.sdk.io.Read.Unbounded;
49import org.apache.beam.sdk.io.UnboundedSource;
50import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
51import org.apache.beam.sdk.options.PipelineOptions;
52import org.apache.beam.sdk.options.ValueProvider;
53import org.apache.beam.sdk.transforms.DoFn;
54import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
55import org.apache.beam.sdk.transforms.MapElements;
56import org.apache.beam.sdk.transforms.PTransform;
57import org.apache.beam.sdk.transforms.ParDo;
58import org.apache.beam.sdk.transforms.SerializableFunction;
59import org.apache.beam.sdk.transforms.SimpleFunction;
60import org.apache.beam.sdk.transforms.display.DisplayData;
61import org.apache.beam.sdk.values.KV;
62import org.apache.beam.sdk.values.PBegin;
63import org.apache.beam.sdk.values.PCollection;
64import org.apache.beam.sdk.values.PDone;
65import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
66import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
67import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
68import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
69import org.apache.kafka.clients.consumer.Consumer;
70import org.apache.kafka.clients.consumer.ConsumerConfig;
71import org.apache.kafka.clients.consumer.KafkaConsumer;
72import org.apache.kafka.clients.producer.KafkaProducer;
73import org.apache.kafka.clients.producer.Producer;
74import org.apache.kafka.clients.producer.ProducerConfig;
75import org.apache.kafka.clients.producer.ProducerRecord;
76import org.apache.kafka.common.TopicPartition;
77import org.apache.kafka.common.serialization.ByteArrayDeserializer;
78import org.apache.kafka.common.serialization.Deserializer;
79import org.apache.kafka.common.serialization.Serializer;
80import org.apache.kafka.common.serialization.StringSerializer;
81import org.apache.kafka.common.utils.AppInfoParser;
82import org.checkerframework.checker.units.qual.K;
83import org.joda.time.Duration;
84import org.joda.time.Instant;
85import org.slf4j.Logger;
86import org.slf4j.LoggerFactory;
87
88/**
89 * An unbounded source and a sink for <a href="http://kafka.apache.org/">Kafka</a> topics.
90 *
91 * <h3>Reading from Kafka topics</h3>
92 *
93 * <p>KafkaIO source returns unbounded collection of Kafka records as {@code
94 * PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic metadata like
95 * topic-partition and offset, along with key and value associated with a Kafka record.
96 *
97 * <p>Although most applications consume a single topic, the source can be configured to consume
98 * multiple topics or even a specific set of {@link TopicPartition}s.
99 *
100 * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>,
101 * one or more topics to consume, and key and value deserializers. For example:
102 *
103 * <pre>{@code
104 * pipeline
105 * .apply(KafkaIO.<Long, String>read()
106 * .withBootstrapServers("broker_1:9092,broker_2:9092")
107 * .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
108 * .withKeyDeserializer(LongDeserializer.class)
109 * .withValueDeserializer(StringDeserializer.class)
110 *
111 * // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
112 *
113 * // Rest of the settings are optional :
114 *
115 * // you can further customize KafkaConsumer used to read the records by adding more
116 * // settings for ConsumerConfig. e.g :
117 * .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
118 *
119 * // set event times and watermark based on 'LogAppendTime'. To provide a custom
120 * // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
121 * // Use withCreateTime() with topics that have 'CreateTime' timestamps.
122 * .withLogAppendTime()
123 *
124 * // restrict reader to committed messages on Kafka (see method documentation).
125 * .withReadCommitted()
126 *
127 * // offset consumed by the pipeline can be committed back.
128 * .commitOffsetsInFinalize()
129 *
130 * // finally, if you don't need Kafka metadata, you can drop it.g
131 * .withoutMetadata() // PCollection<KV<Long, String>>
132 * )
133 * .apply(Values.<String>create()) // PCollection<String>
134 * ...
135 * }</pre>
136 *
137 * <p>Kafka provides deserializers for common types in {@link
138 * org.apache.kafka.common.serialization}. In addition to deserializers, Beam runners need {@link
139 * Coder} to materialize key and value objects if necessary. In most cases, you don't need to
140 * specify {@link Coder} for key and value in the resulting collection because the coders are
141 * inferred from deserializer types. However, in cases when coder inference fails, they can be
142 * specified explicitly along with deserializers using {@link
143 * Read#withKeyDeserializerAndCoder(Class, Coder)} and {@link
144 * Read#withValueDeserializerAndCoder(Class, Coder)}. Note that Kafka messages are interpreted using
145 * key and value <i>deserializers</i>.
146 *
147 * <h3>Partition Assignment and Checkpointing</h3>
148 *
149 * The Kafka partitions are evenly distributed among splits (workers).
150 *
151 * <p>Checkpointing is fully supported and each split can resume from previous checkpoint (to the
152 * extent supported by runner). See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for
153 * more details on splits and checkpoint support.
154 *
155 * <p>When the pipeline starts for the first time, or without any checkpoint, the source starts
156 * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
157 * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through {@link
158 * Read#withConsumerConfigUpdates(Map)}. You can also enable offset auto_commit in Kafka to resume
159 * from last committed.
160 *
161 * <p>In summary, KafkaIO.read follows below sequence to set initial offset:<br>
162 * 1. {@link KafkaCheckpointMark} provided by runner;<br>
163 * 2. Consumer offset stored in Kafka when {@code ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true};
164 * <br>
165 * 3. Start from <em>latest</em> offset by default;
166 *
167 * <p>Seek to initial offset is a blocking operation in Kafka API, which can block forever for
168 * certain versions of Kafka client library. This is resolved by <a
169 * href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior">KIP-266</a>
170 * which provides `default.api.timeout.ms` consumer config setting to control such timeouts.
171 * KafkaIO.read implements timeout itself, to not to block forever in case older Kafka client is
172 * used. It does recognize `default.api.timeout.ms` setting and will honor the timeout value if it
173 * is passes in consumer config.
174 *
175 * <h3>Use Avro schema with Confluent Schema Registry</h3>
176 *
177 * <p>If you want to deserialize the keys and/or values based on a schema available in Confluent
178 * Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
179 * for deserialization. A {@link Coder} will be inferred automatically based on the respective
180 * {@link Deserializer}.
181 *
182 * <p>For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key
183 * and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users
184 * don't need to specify key or/and value deserializers and coders since they will be set to {@link
185 * KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.
186 *
187 * <p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
188 * keys are typed as {@link Long}:
189 *
190 * <pre>{@code
191 * PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
192 * .apply(KafkaIO.<Long, GenericRecord>read()
193 * .withBootstrapServers("broker_1:9092,broker_2:9092")
194 * .withTopic("my_topic")
195 * .withKeyDeserializer(LongDeserializer.class)
196 * // Use Confluent Schema Registry, specify schema registry URL and value subject
197 * .withValueDeserializer(
198 * ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
199 * ...
200 * }</pre>
201 *
202 * <h3>Writing to Kafka</h3>
203 *
204 * <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the
205 * values or native Kafka producer records using {@link
206 * org.apache.kafka.clients.producer.ProducerRecord}. To configure a Kafka sink, you must specify at
207 * the minimum Kafka <tt>bootstrapServers</tt>, the topic to write to, and key and value
208 * serializers. For example:
209 *
210 * <pre>{@code
211 * PCollection<KV<Long, String>> kvColl = ...;
212 * kvColl.apply(KafkaIO.<Long, String>write()
213 * .withBootstrapServers("broker_1:9092,broker_2:9092")
214 * .withTopic("results")
215 *
216 * .withKeySerializer(LongSerializer.class)
217 * .withValueSerializer(StringSerializer.class)
218 *
219 * // You can further customize KafkaProducer used to write the records by adding more
220 * // settings for ProducerConfig. e.g, to enable compression :
221 * .withProducerConfigUpdates(ImmutableMap.of("compression.type", "gzip"))
222 *
223 * // You set publish timestamp for the Kafka records.
224 * .withInputTimestamp() // element timestamp is used while publishing to Kafka
225 * // or you can also set a custom timestamp with a function.
226 * .withPublishTimestampFunction((elem, elemTs) -> ...)
227 *
228 * // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
229 * .withEOS(20, "eos-sink-group-id");
230 * );
231 * }</pre>
232 *
233 * <p>Often you might want to write just values without any keys to Kafka. Use {@code values()} to
234 * write records with default empty(null) key:
235 *
236 * <pre>{@code
237 * PCollection<String> strings = ...;
238 * strings.apply(KafkaIO.<Void, String>write()
239 * .withBootstrapServers("broker_1:9092,broker_2:9092")
240 * .withTopic("results")
241 * .withValueSerializer(StringSerializer.class) // just need serializer for value
242 * .values()
243 * );
244 * }</pre>
245 *
246 * <p>Also, if you want to write Kafka {@link ProducerRecord} then you should use {@link
247 * KafkaIO#writeRecords()}:
248 *
249 * <pre>{@code
250 * PCollection<ProducerRecord<Long, String>> records = ...;
251 * records.apply(KafkaIO.<Long, String>writeRecords()
252 * .withBootstrapServers("broker_1:9092,broker_2:9092")
253 * .withTopic("results")
254 * .withKeySerializer(LongSerializer.class)
255 * .withValueSerializer(StringSerializer.class)
256 * );
257 * }</pre>
258 *
259 * <h3>Advanced Kafka Configuration</h3>
260 *
261 * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in {@link
262 * ProducerConfig} for sink. E.g. if you would like to enable offset <em>auto commit</em> (for
263 * external monitoring or other purposes), you can set <tt>"group.id"</tt>,
264 * <tt>"enable.auto.commit"</tt>, etc.
265 *
266 * <h3>Event Timestamps and Watermark</h3>
267 *
268 * By default, record timestamp (event time) is set to processing time in KafkaIO reader and source
269 * watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled
270 * ('LogAppendTime'), it can enabled with {@link Read#withLogAppendTime()}. A custom timestamp
271 * policy can be provided by implementing {@link TimestampPolicyFactory}. See {@link
272 * Read#withTimestampPolicyFactory(TimestampPolicyFactory)} for more information.
273 *
274 * <h3>Supported Kafka Client Versions</h3>
275 *
276 * KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
277 * <i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions 0.9.x
278 * - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future. Please
279 * ensure that the version included with the application is compatible with the version of your
280 * Kafka cluster. Kafka client usually fails to initialize with a clear error message in case of
281 * incompatibility.
282 */
283@Experimental(Kind.SOURCE_SINK)
284public class KafkaIO {
285
286 /**
287 * A specific instance of uninitialized {@link #read()} where key and values are bytes. See
288 * #read().
289 */
290 public static Read<byte[], byte[]> readBytes() {
291 return KafkaIO.<byte[], byte[]>read()
292 .withKeyDeserializer(ByteArrayDeserializer.class)
293 .withValueDeserializer(ByteArrayDeserializer.class);
294 }
295
296 /**
297 * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka configuration
298 * should set with {@link Read#withBootstrapServers(String)} and {@link Read#withTopics(List)}.
299 * Other optional settings include key and value {@link Deserializer}s, custom timestamp and
300 * watermark functions.
301 */
302 public static <K, V> Read<K, V> read() {
303 return new AutoValue_KafkaIO_Read.Builder<K, V>()
304 .setTopics(new ArrayList<>())
305 .setTopicPartitions(new ArrayList<>())
306 .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
307 .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
308 .setMaxNumRecords(Long.MAX_VALUE)
309 .setCommitOffsetsInFinalizeEnabled(false)
310 .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
311 .build();
312 }
313
314 /**
315 * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration
316 * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} along
317 * with {@link Deserializer}s for (optional) key and values.
318 */
319 public static <K, V> Write<K, V> write() {
320 return new AutoValue_KafkaIO_Write.Builder<K, V>()
321 .setWriteRecordsTransform(
322 new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
323 .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
324 .setEOS(false)
325 .setNumShards(0)
326 .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
327 .build())
328 .build();
329 }
330
331 /**
332 * Creates an uninitialized {@link WriteRecords} {@link PTransform}. Before use, Kafka
333 * configuration should be set with {@link WriteRecords#withBootstrapServers(String)} and {@link
334 * WriteRecords#withTopic} along with {@link Deserializer}s for (optional) key and values.
335 */
336 public static <K, V> WriteRecords<K, V> writeRecords() {
337 return new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
338 .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
339 .setEOS(false)
340 .setNumShards(0)
341 .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
342 .build();
343 }
344
345 ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
346
347 /**
348 * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on
349 * usage and configuration.
350 */
351 @AutoValue
352 public abstract static class Read<K, V>
353 extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
354 abstract Map<String, Object> getConsumerConfig();
355
356 abstract List<String> getTopics();
357
358 abstract List<TopicPartition> getTopicPartitions();
359
360 @Nullable
361 abstract Coder<K> getKeyCoder();
362
363 @Nullable
364 abstract Coder<V> getValueCoder();
365
366 abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
367 getConsumerFactoryFn();
368
369 @Nullable
370 abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();
371
372 abstract long getMaxNumRecords();
373
374 @Nullable
375 abstract Duration getMaxReadTime();
376
377 @Nullable
378 abstract Instant getStartReadTime();
379
380 abstract boolean isCommitOffsetsInFinalizeEnabled();
381
382 abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
383
384 @Nullable
385 abstract Map<String, Object> getOffsetConsumerConfig();
386
387 @Nullable
388 abstract DeserializerProvider getKeyDeserializerProvider();
389
390 @Nullable
391 abstract DeserializerProvider getValueDeserializerProvider();
392
393 abstract Builder<K, V> toBuilder();
394
395 @Experimental(Kind.PORTABILITY)
396 @AutoValue.Builder
397 abstract static class Builder<K, V>
398 implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
399 abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
400
401 abstract Builder<K, V> setTopics(List<String> topics);
402
403 abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);
404
405 abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
406
407 abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
408
409 abstract Builder<K, V> setConsumerFactoryFn(
410 SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
411
412 abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
413
414 abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
415
416 abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
417
418 abstract Builder<K, V> setStartReadTime(Instant startReadTime);
419
420 abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize);
421
422 abstract Builder<K, V> setTimestampPolicyFactory(
423 TimestampPolicyFactory<K, V> timestampPolicyFactory);
424
425 abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);
426
427 abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider deserializerProvider);
428
429 abstract Builder<K, V> setValueDeserializerProvider(
430 DeserializerProvider deserializerProvider);
431
432 abstract Read<K, V> build();
433
434 @Override
435 public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
436 External.Configuration config) {
437 ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
438 for (String topic : config.topics) {
439 listBuilder.add(topic);
440 }
441 setTopics(listBuilder.build());
442
443 Class keyDeserializer = resolveClass(config.keyDeserializer);
444 setKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
445 setKeyCoder(resolveCoder(keyDeserializer));
446
447 Class valueDeserializer = resolveClass(config.valueDeserializer);
448 setValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
449 setValueCoder(resolveCoder(valueDeserializer));
450
451 Map<String, Object> consumerConfig = new HashMap<>();
452 for (KV<String, String> kv : config.consumerConfig) {
453 consumerConfig.put(kv.getKey(), kv.getValue());
454 }
455 // Key and Value Deserializers always have to be in the config.
456 consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
457 consumerConfig.put(
458 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
459 setConsumerConfig(consumerConfig);
460
461 // Set required defaults
462 setTopicPartitions(Collections.emptyList());
463 setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN);
464 setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
465 setCommitOffsetsInFinalizeEnabled(false);
466 setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
467 // We do not include Metadata until we can encode KafkaRecords cross-language
468 return build().withoutMetadata();
469 }
470
471 private static Coder resolveCoder(Class deserializer) {
472 for (Method method : deserializer.getDeclaredMethods()) {
473 if (method.getName().equals("deserialize")) {
474 Class<?> returnType = method.getReturnType();
475 if (returnType.equals(Object.class)) {
476 continue;
477 }
478 if (returnType.equals(byte[].class)) {
479 return ByteArrayCoder.of();
480 } else if (returnType.equals(Integer.class)) {
481 return VarIntCoder.of();
482 } else if (returnType.equals(Long.class)) {
483 return VarLongCoder.of();
484 } else {
485 throw new RuntimeException("Couldn't infer Coder from " + deserializer);
486 }
487 }
488 }
489 throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer);
490 }
491 }
492
493 /**
494 * Exposes {@link KafkaIO.TypedWithoutMetadata} as an external transform for cross-language
495 * usage.
496 */
497 @Experimental(Kind.PORTABILITY)
498 @AutoService(ExternalTransformRegistrar.class)
499 public static class External implements ExternalTransformRegistrar {
500
501 public static final String URN = "beam:external:java:kafka:read:v1";
502
503 @Override
504 public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
505 return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class);
506 }
507
508 /** Parameters class to expose the Read transform to an external SDK. */
509 public static class Configuration {
510
511 // All byte arrays are UTF-8 encoded strings
512 private Iterable<KV<String, String>> consumerConfig;
513 private Iterable<String> topics;
514 private String keyDeserializer;
515 private String valueDeserializer;
516 private Long maxNumRecords;
517
518 public void setConsumerConfig(Iterable<KV<String, String>> consumerConfig) {
519 this.consumerConfig = consumerConfig;
520 }
521
522 public void setMaxNumRecords(Long maxNumRecords) {
523 this.maxNumRecords = maxNumRecords;
524 }
525
526 public void setTopics(Iterable<String> topics) {
527 this.topics = topics;
528 }
529
530 public void setKeyDeserializer(String keyDeserializer) {
531 this.keyDeserializer = keyDeserializer;
532 }
533
534 public void setValueDeserializer(String valueDeserializer) {
535 this.valueDeserializer = valueDeserializer;
536 }
537 }
538 }
539
540 /** Sets the bootstrap servers for the Kafka consumer. */
541 public Read<K, V> withBootstrapServers(String bootstrapServers) {
542 return withConsumerConfigUpdates(
543 ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
544 }
545
546 /**
547 * Sets the topic to read from.
548 *
549 * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
550 * partitions are distributed among the splits.
551 */
552 public Read<K, V> withTopic(String topic) {
553 return withTopics(ImmutableList.of(topic));
554 }
555
556 /**
557 * Sets a list of topics to read from. All the partitions from each of the topics are read.
558 *
559 * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
560 * partitions are distributed among the splits.
561 */
562 public Read<K, V> withTopics(List<String> topics) {
563 checkState(
564 getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
565 return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
566 }
567
568 /**
569 * Sets a list of partitions to read from. This allows reading only a subset of partitions for
570 * one or more topics when (if ever) needed.
571 *
572 * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
573 * partitions are distributed among the splits.
574 */
575 public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
576 checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
577 return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
578 }
579
580 /**
581 * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
582 *
583 * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at
584 * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class,
585 * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to
586 * provide the key coder explicitly.
587 */
588 public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
589 return withKeyDeserializer(LocalDeserializerProvider.of(keyDeserializer));
590 }
591
592 /**
593 * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a
594 * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
595 *
596 * <p>Use this method only if your pipeline doesn't work with plain {@link
597 * #withKeyDeserializer(Class)}.
598 */
599 public Read<K, V> withKeyDeserializerAndCoder(
600 Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
601 return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
602 }
603
604 public Read<K, V> withKeyDeserializer(DeserializerProvider<K> deserializerProvider) {
605 return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
606 }
607
608 /**
609 * Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.
610 *
611 * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize value objects at
612 * runtime. KafkaIO tries to infer a coder for the value based on the {@link Deserializer}
613 * class, however in case that fails, you can use {@link #withValueDeserializerAndCoder(Class,
614 * Coder)} to provide the value coder explicitly.
615 */
616 public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
617 return withValueDeserializer(LocalDeserializerProvider.of(valueDeserializer));
618 }
619
620 /**
621 * Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a
622 * {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary.
623 *
624 * <p>Use this method only if your pipeline doesn't work with plain {@link
625 * #withValueDeserializer(Class)}.
626 */
627 public Read<K, V> withValueDeserializerAndCoder(
628 Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
629 return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
630 }
631
632 public Read<K, V> withValueDeserializer(DeserializerProvider<V> deserializerProvider) {
633 return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
634 }
635
636 /**
637 * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
638 * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.
639 */
640 public Read<K, V> withConsumerFactoryFn(
641 SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
642 return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
643 }
644
645 /**
646 * Update consumer configuration with new properties.
647 *
648 * @deprecated as of version 2.13. Use {@link #withConsumerConfigUpdates(Map)} instead
649 */
650 @Deprecated
651 public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
652 Map<String, Object> config =
653 updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates);
654 return toBuilder().setConsumerConfig(config).build();
655 }
656
657 /**
658 * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxNumRecords(long)}. Mainly used
659 * for tests and demo applications.
660 */
661 public Read<K, V> withMaxNumRecords(long maxNumRecords) {
662 return toBuilder().setMaxNumRecords(maxNumRecords).build();
663 }
664
665 /**
666 * Use timestamp to set up start offset. It is only supported by Kafka Client 0.10.1.0 onwards
667 * and the message format version after 0.10.0.
668 *
669 * <p>Note that this take priority over start offset configuration {@code
670 * ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.
671 *
672 * <p>This results in hard failures in either of the following two cases : 1. If one of more
673 * partitions do not contain any messages with timestamp larger than or equal to desired
674 * timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the
675 * messages do not have timestamps.
676 */
677 public Read<K, V> withStartReadTime(Instant startReadTime) {
678 return toBuilder().setStartReadTime(startReadTime).build();
679 }
680
681 /**
682 * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}. Mainly
683 * used for tests and demo applications.
684 */
685 public Read<K, V> withMaxReadTime(Duration maxReadTime) {
686 return toBuilder().setMaxReadTime(maxReadTime).build();
687 }
688
689 /**
690 * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.LogAppendTimePolicy}. The
691 * policy assigns Kafka's log append time (server side ingestion time) to each record. The
692 * watermark for each Kafka partition is the timestamp of the last record read. If a partition
693 * is idle, the watermark advances to couple of seconds behind wall time. Every record consumed
694 * from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.
695 *
696 * <p>In Kafka, log append time needs to be enabled for each topic, and all the subsequent
697 * records wil have their timestamp set to log append time. If a record does not have its
698 * timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous
699 * record timestamp or latest watermark, whichever is larger.
700 *
701 * <p>The watermark for the entire source is the oldest of each partition's watermark. If one of
702 * the readers falls behind possibly due to uneven distribution of records among Kafka
703 * partitions, it ends up holding the watermark for the entire source.
704 */
705 public Read<K, V> withLogAppendTime() {
706 return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
707 }
708
709 /**
710 * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.ProcessingTimePolicy}. This is
711 * the default timestamp policy. It assigns processing time to each record. Specifically, this
712 * is the timestamp when the record becomes 'current' in the reader. The watermark aways
713 * advances to current time. If server side time (log append time) is enabled in Kafka, {@link
714 * #withLogAppendTime()} is recommended over this.
715 */
716 public Read<K, V> withProcessingTime() {
717 return withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
718 }
719
720 /**
721 * Sets the timestamps policy based on {@link KafkaTimestampType#CREATE_TIME} timestamp of the
722 * records. It is an error if a record's timestamp type is not {@link
723 * KafkaTimestampType#CREATE_TIME}. The timestamps within a partition are expected to be roughly
724 * monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute).
725 * The watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.
726 * However, watermark is never set in future and capped to 'now - max delay'. In addition,
727 * watermark advanced to 'now - max delay' when a partition is idle.
728 *
729 * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent record
730 * is expected to be after {@code current record timestamp - maxDelay}.
731 */
732 public Read<K, V> withCreateTime(Duration maxDelay) {
733 return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
734 }
735
736 /**
737 * Provide custom {@link TimestampPolicyFactory} to set event times and watermark for each
738 * partition. {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} is
739 * invoked for each partition when the reader starts.
740 *
741 * @see #withLogAppendTime()
742 * @see #withCreateTime(Duration)
743 * @see #withProcessingTime()
744 */
745 public Read<K, V> withTimestampPolicyFactory(
746 TimestampPolicyFactory<K, V> timestampPolicyFactory) {
747 return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
748 }
749
750 /**
751 * A function to assign a timestamp to a record. Default is processing timestamp.
752 *
753 * @deprecated as of version 2.4. Use {@link
754 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
755 */
756 @Deprecated
757 public Read<K, V> withTimestampFn2(
758 SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
759 checkArgument(timestampFn != null, "timestampFn can not be null");
760 return toBuilder()
761 .setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn))
762 .build();
763 }
764
765 /**
766 * A function to calculate watermark after a record. Default is last record timestamp.
767 *
768 * @see #withTimestampFn(SerializableFunction)
769 * @deprecated as of version 2.4. Use {@link
770 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
771 */
772 @Deprecated
773 public Read<K, V> withWatermarkFn2(
774 SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
775 checkArgument(watermarkFn != null, "watermarkFn can not be null");
776 return toBuilder().setWatermarkFn(watermarkFn).build();
777 }
778
779 /**
780 * A function to assign a timestamp to a record. Default is processing timestamp.
781 *
782 * @deprecated as of version 2.4. Use {@link
783 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
784 */
785 @Deprecated
786 public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
787 checkArgument(timestampFn != null, "timestampFn can not be null");
788 return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
789 }
790
791 /**
792 * A function to calculate watermark after a record. Default is last record timestamp.
793 *
794 * @see #withTimestampFn(SerializableFunction)
795 * @deprecated as of version 2.4. Use {@link
796 * #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
797 */
798 @Deprecated
799 public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
800 checkArgument(watermarkFn != null, "watermarkFn can not be null");
801 return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
802 }
803
804 /**
805 * Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is ensures
806 * that the consumer does not read uncommitted messages. Kafka version 0.11 introduced
807 * transactional writes. Applications requiring end-to-end exactly-once semantics should only
808 * read committed messages. See JavaDoc for {@link KafkaConsumer} for more description.
809 */
810 public Read<K, V> withReadCommitted() {
811 return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
812 }
813
814 /**
815 * Finalized offsets are committed to Kafka. See {@link CheckpointMark#finalizeCheckpoint()}. It
816 * helps with minimizing gaps or duplicate processing of records while restarting a pipeline
817 * from scratch. But it does not provide hard processing guarantees. There could be a short
818 * delay to commit after {@link CheckpointMark#finalizeCheckpoint()} is invoked, as reader might
819 * be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer
820 * configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both.
821 */
822 public Read<K, V> commitOffsetsInFinalize() {
823 return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
824 }
825
826 /**
827 * Set additional configuration for the backend offset consumer. It may be required for a
828 * secured Kafka cluster, especially when you see similar WARN log message 'exception while
829 * fetching latest offset for partition {}. will be retried'.
830 *
831 * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
832 * 1. the main consumer, which reads data from kafka;<br>
833 * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
834 * offset;<br>
835 *
836 * <p>By default, offset consumer inherits the configuration from main consumer, with an
837 * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka
838 * which requires more configurations.
839 */
840 public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
841 return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
842 }
843
844 /**
845 * Update configuration for the backend main consumer. Note that the default consumer properties
846 * will not be completely overridden. This method only updates the value which has the same key.
847 *
848 * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
849 * 1. the main consumer, which reads data from kafka;<br>
850 * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
851 * offset;<br>
852 *
853 * <p>By default, main consumer uses the configuration from {@link
854 * #DEFAULT_CONSUMER_PROPERTIES}.
855 */
856 public Read<K, V> withConsumerConfigUpdates(Map<String, Object> configUpdates) {
857 Map<String, Object> config =
858 updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates);
859 return toBuilder().setConsumerConfig(config).build();
860 }
861
862 /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
863 public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
864 return new TypedWithoutMetadata<>(this);
865 }
866
867 @Override
868 public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
869 checkArgument(
870 getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
871 "withBootstrapServers() is required");
872 checkArgument(
873 getTopics().size() > 0 || getTopicPartitions().size() > 0,
874 "Either withTopic(), withTopics() or withTopicPartitions() is required");
875 checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
876 checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");
877
878 ConsumerSpEL consumerSpEL = new ConsumerSpEL();
879 if (!consumerSpEL.hasOffsetsForTimes()) {
880 LOG.warn(
881 "Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and "
882 + "may not be supported in next release of Apache Beam. "
883 + "Please upgrade your Kafka client version.",
884 AppInfoParser.getVersion());
885 }
886 if (getStartReadTime() != null) {
887 checkArgument(
888 consumerSpEL.hasOffsetsForTimes(),
889 "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
890 + "current version of Kafka Client is "
891 + AppInfoParser.getVersion()
892 + ". If you are building with maven, set \"kafka.clients.version\" "
893 + "maven property to 0.10.1.0 or newer.");
894 }
895 if (isCommitOffsetsInFinalizeEnabled()) {
896 checkArgument(
897 getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null,
898 "commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config "
899 + "is not set. Offset management requires group.id.");
900 if (Boolean.TRUE.equals(
901 getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
902 LOG.warn(
903 "'{}' in consumer config is enabled even though commitOffsetsInFinalize() "
904 + "is set. You need only one of them.",
905 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
906 }
907 }
908
909 // Infer key/value coders if not specified explicitly
910 CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
911
912 Coder<K> keyCoder = getKeyCoder(coderRegistry);
913 Coder<V> valueCoder = getValueCoder(coderRegistry);
914
915 // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
916 Unbounded<KafkaRecord<K, V>> unbounded =
917 org.apache.beam.sdk.io.Read.from(
918 toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
919
920 PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
921
922 if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
923 transform =
924 unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
925 }
926
927 return input.getPipeline().apply(transform);
928 }
929
930 private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
931 return (getKeyCoder() != null)
932 ? getKeyCoder()
933 : getKeyDeserializerProvider().getCoder(coderRegistry);
934 }
935
936 private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
937 return (getValueCoder() != null)
938 ? getValueCoder()
939 : getValueDeserializerProvider().getCoder(coderRegistry);
940 }
941
942 /**
943 * Creates an {@link UnboundedSource UnboundedSource<KafkaRecord<K, V>, ?>} with the
944 * configuration in {@link Read}. Primary use case is unit tests, should not be used in an
945 * application.
946 */
947 @VisibleForTesting
948 UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
949 return new KafkaUnboundedSource<>(this, -1);
950 }
951
952 // utility method to convert KafkaRecord<K, V> to user KV<K, V> before applying user functions
953 private static <KeyT, ValueT, OutT>
954 SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(
955 final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
956 return record -> fn.apply(record.getKV());
957 }
958 ///////////////////////////////////////////////////////////////////////////////////////
959
960 /** A set of properties that are not required or don't make sense for our consumer. */
961 private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES =
962 ImmutableMap.of(
963 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",
964 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"
965 // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
966 // lets allow these, applications can have better resume point for restarts.
967 );
968
969 // set config defaults
970 private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
971 ImmutableMap.of(
972 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
973 ByteArrayDeserializer.class.getName(),
974 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
975 ByteArrayDeserializer.class.getName(),
976
977 // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.
978 // with default value of of 32K, It takes multiple seconds between successful polls.
979 // All the consumer work is done inside poll(), with smaller send buffer size, it
980 // takes many polls before a 1MB chunk from the server is fully read. In my testing
981 // about half of the time select() inside kafka consumer waited for 20-30ms, though
982 // the server had lots of data in tcp send buffers on its side. Compared to default,
983 // this setting increased throughput by many fold (3-4x).
984 ConsumerConfig.RECEIVE_BUFFER_CONFIG,
985 512 * 1024,
986
987 // default to latest offset when we are not resuming.
988 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
989 "latest",
990 // disable auto commit of offsets. we don't require group_id. could be enabled by user.
991 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
992 false);
993
994 // default Kafka 0.9 Consumer supplier.
995 private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
996 KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new;
997
998 @SuppressWarnings("unchecked")
999 @Override
1000 public void populateDisplayData(DisplayData.Builder builder) {
1001 super.populateDisplayData(builder);
1002 List<String> topics = getTopics();
1003 List<TopicPartition> topicPartitions = getTopicPartitions();
1004 if (topics.size() > 0) {
1005 builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
1006 } else if (topicPartitions.size() > 0) {
1007 builder.add(
1008 DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
1009 .withLabel("Topic Partition/s"));
1010 }
1011 Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet();
1012 for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
1013 String key = conf.getKey();
1014 if (!ignoredConsumerPropertiesKeys.contains(key)) {
1015 Object value =
1016 DisplayData.inferType(conf.getValue()) != null
1017 ? conf.getValue()
1018 : String.valueOf(conf.getValue());
1019 builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
1020 }
1021 }
1022 }
1023 }
1024
1025 /**
1026 * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but removes
1027 * Kafka metatdata and returns a {@link PCollection} of {@link KV}. See {@link KafkaIO} for more
1028 * information on usage and configuration of reader.
1029 */
1030 public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
1031 private final Read<K, V> read;
1032
1033 TypedWithoutMetadata(Read<K, V> read) {
1034 super("KafkaIO.Read");
1035 this.read = read;
1036 }
1037
1038 @Override
1039 public PCollection<KV<K, V>> expand(PBegin begin) {
1040 return begin
1041 .apply(read)
1042 .apply(
1043 "Remove Kafka Metadata",
1044 ParDo.of(
1045 new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
1046 @ProcessElement
1047 public void processElement(ProcessContext ctx) {
1048 ctx.output(ctx.element().getKV());
1049 }
1050 }));
1051 }
1052
1053 @Override
1054 public void populateDisplayData(DisplayData.Builder builder) {
1055 super.populateDisplayData(builder);
1056 read.populateDisplayData(builder);
1057 }
1058 }
1059
1060 ////////////////////////////////////////////////////////////////////////////////////////////////
1061
1062 private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
1063
1064 /**
1065 * Returns a new config map which is merge of current config and updates. Verifies the updates do
1066 * not includes ignored properties.
1067 */
1068 private static Map<String, Object> updateKafkaProperties(
1069 Map<String, Object> currentConfig,
1070 Map<String, String> ignoredProperties,
1071 Map<String, Object> updates) {
1072
1073 for (String key : updates.keySet()) {
1074 checkArgument(
1075 !ignoredProperties.containsKey(key),
1076 "No need to configure '%s'. %s",
1077 key,
1078 ignoredProperties.get(key));
1079 }
1080
1081 Map<String, Object> config = new HashMap<>(currentConfig);
1082 config.putAll(updates);
1083
1084 return config;
1085 }
1086
1087 /** Static class, prevent instantiation. */
1088 private KafkaIO() {}
1089
1090 //////////////////////// Sink Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
1091
1092 /**
1093 * A {@link PTransform} to write to a Kafka topic with ProducerRecord's. See {@link KafkaIO} for
1094 * more information on usage and configuration.
1095 */
1096 @AutoValue
1097 public abstract static class WriteRecords<K, V>
1098 extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
1099 // TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be
1100 // parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,
1101 // we shouldn't have to duplicate the same API for similar transforms like {@link Write} and
1102 // {@link WriteRecords}. See example at {@link PubsubIO.Write}.
1103
1104 @Nullable
1105 abstract String getTopic();
1106
1107 abstract Map<String, Object> getProducerConfig();
1108
1109 @Nullable
1110 abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();
1111
1112 @Nullable
1113 abstract Class<? extends Serializer<K>> getKeySerializer();
1114
1115 @Nullable
1116 abstract Class<? extends Serializer<V>> getValueSerializer();
1117
1118 @Nullable
1119 abstract KafkaPublishTimestampFunction<ProducerRecord<K, V>> getPublishTimestampFunction();
1120
1121 // Configuration for EOS sink
1122 abstract boolean isEOS();
1123
1124 @Nullable
1125 abstract String getSinkGroupId();
1126
1127 abstract int getNumShards();
1128
1129 @Nullable
1130 abstract SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>>
1131 getConsumerFactoryFn();
1132
1133 abstract Builder<K, V> toBuilder();
1134
1135 @AutoValue.Builder
1136 abstract static class Builder<K, V> {
1137 abstract Builder<K, V> setTopic(String topic);
1138
1139 abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
1140
1141 abstract Builder<K, V> setProducerFactoryFn(
1142 SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
1143
1144 abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer);
1145
1146 abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer);
1147
1148 abstract Builder<K, V> setPublishTimestampFunction(
1149 KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction);
1150
1151 abstract Builder<K, V> setEOS(boolean eosEnabled);
1152
1153 abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
1154
1155 abstract Builder<K, V> setNumShards(int numShards);
1156
1157 abstract Builder<K, V> setConsumerFactoryFn(
1158 SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> fn);
1159
1160 abstract WriteRecords<K, V> build();
1161 }
1162
1163 /**
1164 * Returns a new {@link Write} transform with Kafka producer pointing to {@code
1165 * bootstrapServers}.
1166 */
1167 public WriteRecords<K, V> withBootstrapServers(String bootstrapServers) {
1168 return withProducerConfigUpdates(
1169 ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
1170 }
1171
1172 /**
1173 * Sets the default Kafka topic to write to. Use {@code ProducerRecords} to set topic name per
1174 * published record.
1175 */
1176 public WriteRecords<K, V> withTopic(String topic) {
1177 return toBuilder().setTopic(topic).build();
1178 }
1179
1180 /**
1181 * Sets a {@link Serializer} for serializing key (if any) to bytes.
1182 *
1183 * <p>A key is optional while writing to Kafka. Note when a key is set, its hash is used to
1184 * determine partition in Kafka (see {@link ProducerRecord} for more details).
1185 */
1186 public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
1187 return toBuilder().setKeySerializer(keySerializer).build();
1188 }
1189
1190 /** Sets a {@link Serializer} for serializing value to bytes. */
1191 public WriteRecords<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
1192 return toBuilder().setValueSerializer(valueSerializer).build();
1193 }
1194
1195 /**
1196 * Adds the given producer properties, overriding old values of properties with the same key.
1197 *
1198 * @deprecated as of version 2.13. Use {@link #withProducerConfigUpdates(Map)} instead.
1199 */
1200 @Deprecated
1201 public WriteRecords<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
1202 Map<String, Object> config =
1203 updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, configUpdates);
1204 return toBuilder().setProducerConfig(config).build();
1205 }
1206
1207 /**
1208 * Update configuration for the producer. Note that the default producer properties will not be
1209 * completely overridden. This method only updates the value which has the same key.
1210 *
1211 * <p>By default, the producer uses the configuration from {@link #DEFAULT_PRODUCER_PROPERTIES}.
1212 */
1213 public WriteRecords<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
1214 Map<String, Object> config =
1215 updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, configUpdates);
1216 return toBuilder().setProducerConfig(config).build();
1217 }
1218
1219 /**
1220 * Sets a custom function to create Kafka producer. Primarily used for tests. Default is {@link
1221 * KafkaProducer}
1222 */
1223 public WriteRecords<K, V> withProducerFactoryFn(
1224 SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
1225 return toBuilder().setProducerFactoryFn(producerFactoryFn).build();
1226 }
1227
1228 /**
1229 * The timestamp for each record being published is set to timestamp of the element in the
1230 * pipeline. This is equivalent to {@code withPublishTimestampFunction((e, ts) -> ts)}. <br>
1231 * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is
1232 * processing messages from the past, they might be deleted immediately by Kafka after being
1233 * published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.
1234 */
1235 public WriteRecords<K, V> withInputTimestamp() {
1236 return withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
1237 }
1238
1239 /**
1240 * A function to provide timestamp for records being published. <br>
1241 * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is
1242 * processing messages from the past, they might be deleted immediately by Kafka after being
1243 * published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.
1244 *
1245 * @deprecated use {@code ProducerRecords} to set publish timestamp.
1246 */
1247 @Deprecated
1248 public WriteRecords<K, V> withPublishTimestampFunction(
1249 KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) {
1250 return toBuilder().setPublishTimestampFunction(timestampFunction).build();
1251 }
1252
1253 /**
1254 * Provides exactly-once semantics while writing to Kafka, which enables applications with
1255 * end-to-end exactly-once guarantees on top of exactly-once semantics <i>within</i> Beam
1256 * pipelines. It ensures that records written to sink are committed on Kafka exactly once, even
1257 * in the case of retries during pipeline execution even when some processing is retried.
1258 * Retries typically occur when workers restart (as in failure recovery), or when the work is
1259 * redistributed (as in an autoscaling event).
1260 *
1261 * <p>Beam runners typically provide exactly-once semantics for results of a pipeline, but not
1262 * for side effects from user code in transform. If a transform such as Kafka sink writes to an
1263 * external system, those writes might occur more than once. When EOS is enabled here, the sink
1264 * transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka
1265 * (version 0.11+) to ensure a record is written only once. As the implementation relies on
1266 * runners checkpoint semantics, not all the runners are compatible. The sink throws an
1267 * exception during initialization if the runner is not explicitly allowed. Flink runner is one
1268 * of the runners whose checkpoint semantics are not compatible with current implementation
1269 * (hope to provide a solution in near future). Dataflow runner and Spark runners are
1270 * compatible.
1271 *
1272 * <p>Note on performance: Exactly-once sink involves two shuffles of the records. In addition
1273 * to cost of shuffling the records among workers, the records go through 2
1274 * serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU
1275 * cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e.
1276 * serializing them to byte before writing to Kafka sink).
1277 *
1278 * @param numShards Sets sink parallelism. The state metadata stored on Kafka is stored across
1279 * this many virtual partitions using {@code sinkGroupId}. A good rule of thumb is to set
1280 * this to be around number of partitions in Kafka topic.
1281 * @param sinkGroupId The <i>group id</i> used to store small amount of state as metadata on
1282 * Kafka. It is similar to <i>consumer group id</i> used with a {@link KafkaConsumer}. Each
1283 * job should use a unique group id so that restarts/updates of job preserve the state to
1284 * ensure exactly-once semantics. The state is committed atomically with sink transactions
1285 * on Kafka. See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)} for more
1286 * information. The sink performs multiple sanity checks during initialization to catch
1287 * common mistakes so that it does not end up using state that does not <i>seem</i> to be
1288 * written by the same job.
1289 */
1290 public WriteRecords<K, V> withEOS(int numShards, String sinkGroupId) {
1291 KafkaExactlyOnceSink.ensureEOSSupport();
1292 checkArgument(numShards >= 1, "numShards should be >= 1");
1293 checkArgument(sinkGroupId != null, "sinkGroupId is required for exactly-once sink");
1294 return toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build();
1295 }
1296
1297 /**
1298 * When exactly-once semantics are enabled (see {@link #withEOS(int, String)}), the sink needs
1299 * to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer.
1300 * Similar to {@link Read#withConsumerFactoryFn(SerializableFunction)}, a factory function can
1301 * be supplied if required in a specific case. The default is {@link KafkaConsumer}.
1302 */
1303 public WriteRecords<K, V> withConsumerFactoryFn(
1304 SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
1305 return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
1306 }
1307
1308 @Override
1309 public PDone expand(PCollection<ProducerRecord<K, V>> input) {
1310 checkArgument(
1311 getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
1312 "withBootstrapServers() is required");
1313
1314 checkArgument(getKeySerializer() != null, "withKeySerializer() is required");
1315 checkArgument(getValueSerializer() != null, "withValueSerializer() is required");
1316
1317 if (isEOS()) {
1318 checkArgument(getTopic() != null, "withTopic() is required when isEOS() is true");
1319 KafkaExactlyOnceSink.ensureEOSSupport();
1320
1321 // TODO: Verify that the group_id does not have existing state stored on Kafka unless
1322 // this is an upgrade. This avoids issues with simple mistake of reusing group_id
1323 // across multiple runs or across multiple jobs. This is checked when the sink
1324 // transform initializes while processing the output. It might be better to
1325 // check here to catch common mistake.
1326
1327 input.apply(new KafkaExactlyOnceSink<>(this));
1328 } else {
1329 input.apply(ParDo.of(new KafkaWriter<>(this)));
1330 }
1331 return PDone.in(input.getPipeline());
1332 }
1333
1334 @Override
1335 public void validate(PipelineOptions options) {
1336 if (isEOS()) {
1337 String runner = options.getRunner().getName();
1338 if ("org.apache.beam.runners.direct.DirectRunner".equals(runner)
1339 || runner.startsWith("org.apache.beam.runners.dataflow.")
1340 || runner.startsWith("org.apache.beam.runners.spark.")
1341 || runner.startsWith("org.apache.beam.runners.flink.")) {
1342 return;
1343 }
1344 throw new UnsupportedOperationException(
1345 runner
1346 + " is not a runner known to be compatible with Kafka exactly-once sink. "
1347 + "This implementation of exactly-once sink relies on specific checkpoint guarantees. "
1348 + "Only the runners with known to have compatible checkpoint semantics are allowed.");
1349 }
1350 }
1351
1352 // set config defaults
1353 private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
1354 ImmutableMap.of(ProducerConfig.RETRIES_CONFIG, 3);
1355
1356 /** A set of properties that are not required or don't make sense for our producer. */
1357 private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES =
1358 ImmutableMap.of(
1359 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
1360 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead");
1361
1362 @Override
1363 public void populateDisplayData(DisplayData.Builder builder) {
1364 super.populateDisplayData(builder);
1365 builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
1366 Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
1367 for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {
1368 String key = conf.getKey();
1369 if (!ignoredProducerPropertiesKeys.contains(key)) {
1370 Object value =
1371 DisplayData.inferType(conf.getValue()) != null
1372 ? conf.getValue()
1373 : String.valueOf(conf.getValue());
1374 builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
1375 }
1376 }
1377 }
1378 }
1379
1380 /**
1381 * A {@link PTransform} to write to a Kafka topic with KVs . See {@link KafkaIO} for more
1382 * information on usage and configuration.
1383 */
1384 @AutoValue
1385 public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
1386 // TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be
1387 // parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,
1388 // we shouldn't have to duplicate the same API for similar transforms like {@link Write} and
1389 // {@link WriteRecords}. See example at {@link PubsubIO.Write}.
1390
1391 @Nullable
1392 abstract String getTopic();
1393
1394 abstract WriteRecords<K, V> getWriteRecordsTransform();
1395
1396 abstract Builder<K, V> toBuilder();
1397
1398 @Experimental(Kind.PORTABILITY)
1399 @AutoValue.Builder
1400 abstract static class Builder<K, V>
1401 implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {
1402 abstract Builder<K, V> setTopic(String topic);
1403
1404 abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> transform);
1405
1406 abstract Write<K, V> build();
1407
1408 @Override
1409 public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
1410 External.Configuration configuration) {
1411 setTopic(configuration.topic);
1412
1413 Map<String, Object> producerConfig = new HashMap<>();
1414 for (KV<String, String> kv : configuration.producerConfig) {
1415 producerConfig.put(kv.getKey(), kv.getValue());
1416 }
1417 Class keySerializer = resolveClass(configuration.keySerializer);
1418 Class valSerializer = resolveClass(configuration.valueSerializer);
1419
1420 WriteRecords<K, V> writeRecords =
1421 KafkaIO.<K, V>writeRecords()
1422 .withProducerConfigUpdates(producerConfig)
1423 .withKeySerializer(keySerializer)
1424 .withValueSerializer(valSerializer)
1425 .withTopic(configuration.topic);
1426 setWriteRecordsTransform(writeRecords);
1427
1428 return build();
1429 }
1430 }
1431
1432 /** Exposes {@link KafkaIO.Write} as an external transform for cross-language usage. */
1433 @Experimental(Kind.PORTABILITY)
1434 @AutoService(ExternalTransformRegistrar.class)
1435 public static class External implements ExternalTransformRegistrar {
1436
1437 public static final String URN = "beam:external:java:kafka:write:v1";
1438
1439 @Override
1440 public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
1441 return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class);
1442 }
1443
1444 /** Parameters class to expose the Write transform to an external SDK. */
1445 public static class Configuration {
1446
1447 // All byte arrays are UTF-8 encoded strings
1448 private Iterable<KV<String, String>> producerConfig;
1449 private String topic;
1450 private String keySerializer;
1451 private String valueSerializer;
1452
1453 public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
1454 this.producerConfig = producerConfig;
1455 }
1456
1457 public void setTopic(String topic) {
1458 this.topic = topic;
1459 }
1460
1461 public void setKeySerializer(String keySerializer) {
1462 this.keySerializer = keySerializer;
1463 }
1464
1465 public void setValueSerializer(String valueSerializer) {
1466 this.valueSerializer = valueSerializer;
1467 }
1468 }
1469 }
1470
1471 /** Used mostly to reduce using of boilerplate of wrapping {@link WriteRecords} methods. */
1472 private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> transform) {
1473 return toBuilder().setWriteRecordsTransform(transform).build();
1474 }
1475
1476 /**
1477 * Wrapper method over {@link WriteRecords#withBootstrapServers(String)}, used to keep the
1478 * compatibility with old API based on KV type of element.
1479 */
1480 public Write<K, V> withBootstrapServers(String bootstrapServers) {
1481 return withWriteRecordsTransform(
1482 getWriteRecordsTransform().withBootstrapServers(bootstrapServers));
1483 }
1484
1485 /**
1486 * Wrapper method over {@link WriteRecords#withTopic(String)}, used to keep the compatibility
1487 * with old API based on KV type of element.
1488 */
1489 public Write<K, V> withTopic(String topic) {
1490 return toBuilder()
1491 .setTopic(topic)
1492 .setWriteRecordsTransform(getWriteRecordsTransform().withTopic(topic))
1493 .build();
1494 }
1495
1496 /**
1497 * Wrapper method over {@link WriteRecords#withKeySerializer(Class)}, used to keep the
1498 * compatibility with old API based on KV type of element.
1499 */
1500 public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
1501 return withWriteRecordsTransform(getWriteRecordsTransform().withKeySerializer(keySerializer));
1502 }
1503
1504 /**
1505 * Wrapper method over {@link WriteRecords#withValueSerializer(Class)}, used to keep the
1506 * compatibility with old API based on KV type of element.
1507 */
1508 public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
1509 return withWriteRecordsTransform(
1510 getWriteRecordsTransform().withValueSerializer(valueSerializer));
1511 }
1512
1513 /**
1514 * Wrapper method over {@link WriteRecords#withProducerFactoryFn(SerializableFunction)}, used to
1515 * keep the compatibility with old API based on KV type of element.
1516 */
1517 public Write<K, V> withProducerFactoryFn(
1518 SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
1519 return withWriteRecordsTransform(
1520 getWriteRecordsTransform().withProducerFactoryFn(producerFactoryFn));
1521 }
1522
1523 /**
1524 * Wrapper method over {@link WriteRecords#withInputTimestamp()}, used to keep the compatibility
1525 * with old API based on KV type of element.
1526 */
1527 public Write<K, V> withInputTimestamp() {
1528 return withWriteRecordsTransform(getWriteRecordsTransform().withInputTimestamp());
1529 }
1530
1531 /**
1532 * Wrapper method over {@link
1533 * WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}, used to keep the
1534 * compatibility with old API based on KV type of element.
1535 *
1536 * @deprecated use {@link WriteRecords} and {@code ProducerRecords} to set publish timestamp.
1537 */
1538 @Deprecated
1539 @SuppressWarnings({"unchecked", "rawtypes"})
1540 public Write<K, V> withPublishTimestampFunction(
1541 KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
1542 return withWriteRecordsTransform(
1543 getWriteRecordsTransform()
1544 .withPublishTimestampFunction(new PublishTimestampFunctionKV(timestampFunction)));
1545 }
1546
1547 /**
1548 * Wrapper method over {@link WriteRecords#withEOS(int, String)}, used to keep the compatibility
1549 * with old API based on KV type of element.
1550 */
1551 public Write<K, V> withEOS(int numShards, String sinkGroupId) {
1552 return withWriteRecordsTransform(getWriteRecordsTransform().withEOS(numShards, sinkGroupId));
1553 }
1554
1555 /**
1556 * Wrapper method over {@link WriteRecords#withConsumerFactoryFn(SerializableFunction)}, used to
1557 * keep the compatibility with old API based on KV type of element.
1558 */
1559 public Write<K, V> withConsumerFactoryFn(
1560 SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
1561 return withWriteRecordsTransform(
1562 getWriteRecordsTransform().withConsumerFactoryFn(consumerFactoryFn));
1563 }
1564
1565 /**
1566 * Adds the given producer properties, overriding old values of properties with the same key.
1567 *
1568 * @deprecated as of version 2.13. Use {@link #withProducerConfigUpdates(Map)} instead.
1569 */
1570 @Deprecated
1571 public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
1572 return withWriteRecordsTransform(
1573 getWriteRecordsTransform().updateProducerProperties(configUpdates));
1574 }
1575
1576 /**
1577 * Update configuration for the producer. Note that the default producer properties will not be
1578 * completely overridden. This method only updates the value which has the same key.
1579 *
1580 * <p>By default, the producer uses the configuration from {@link
1581 * WriteRecords#DEFAULT_PRODUCER_PROPERTIES}.
1582 */
1583 public Write<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
1584 return withWriteRecordsTransform(
1585 getWriteRecordsTransform().withProducerConfigUpdates(configUpdates));
1586 }
1587
1588 @Override
1589 public PDone expand(PCollection<KV<K, V>> input) {
1590 checkArgument(getTopic() != null, "withTopic() is required");
1591
1592 KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
1593 return input
1594 .apply(
1595 "Kafka ProducerRecord",
1596 MapElements.via(
1597 new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>() {
1598 @Override
1599 public ProducerRecord<K, V> apply(KV<K, V> element) {
1600 return new ProducerRecord<>(getTopic(), element.getKey(), element.getValue());
1601 }
1602 }))
1603 .setCoder(ProducerRecordCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()))
1604 .apply(getWriteRecordsTransform());
1605 }
1606
1607 @Override
1608 public void validate(PipelineOptions options) {
1609 getWriteRecordsTransform().validate(options);
1610 }
1611
1612 @Override
1613 public void populateDisplayData(DisplayData.Builder builder) {
1614 super.populateDisplayData(builder);
1615 getWriteRecordsTransform().populateDisplayData(builder);
1616 }
1617
1618 /**
1619 * Writes just the values to Kafka. This is useful for writing collections of values rather
1620 * thank {@link KV}s.
1621 */
1622 @SuppressWarnings({"unchecked", "rawtypes"})
1623 public PTransform<PCollection<V>, PDone> values() {
1624 return new KafkaValueWrite<K, V>(this.withKeySerializer((Class) StringSerializer.class));
1625 }
1626
1627 /**
1628 * Wrapper class which allows to use {@code KafkaPublishTimestampFunction<KV<K, V>} with {@link
1629 * WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}.
1630 */
1631 private static class PublishTimestampFunctionKV<K, V>
1632 implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
1633
1634 private KafkaPublishTimestampFunction<KV<K, V>> fn;
1635
1636 public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, V>> fn) {
1637 this.fn = fn;
1638 }
1639
1640 @Override
1641 public Instant getTimestamp(ProducerRecord<K, V> e, Instant ts) {
1642 return fn.getTimestamp(KV.of(e.key(), e.value()), ts);
1643 }
1644 }
1645 }
1646
1647 /**
1648 * Same as {@code Write<K, V>} without a Key. Null is used for key as it is the convention is
1649 * Kafka when there is no key specified. Majority of Kafka writers don't specify a key.
1650 */
1651 private static class KafkaValueWrite<K, V> extends PTransform<PCollection<V>, PDone> {
1652 private final Write<K, V> kvWriteTransform;
1653
1654 private KafkaValueWrite(Write<K, V> kvWriteTransform) {
1655 this.kvWriteTransform = kvWriteTransform;
1656 }
1657
1658 @Override
1659 public PDone expand(PCollection<V> input) {
1660 return input
1661 .apply(
1662 "Kafka values with default key",
1663 MapElements.via(
1664 new SimpleFunction<V, KV<K, V>>() {
1665 @Override
1666 public KV<K, V> apply(V element) {
1667 return KV.of(null, element);
1668 }
1669 }))
1670 .setCoder(KvCoder.of(new NullOnlyCoder<>(), input.getCoder()))
1671 .apply(kvWriteTransform);
1672 }
1673
1674 @Override
1675 public void populateDisplayData(DisplayData.Builder builder) {
1676 super.populateDisplayData(builder);
1677 kvWriteTransform.populateDisplayData(builder);
1678 }
1679
1680 @Experimental(Kind.PORTABILITY)
1681 public static class Builder<K, V>
1682 implements ExternalTransformBuilder<KafkaValueWrite.External.Configuration, PCollection<V>, PDone> {
1683
1684 @Override
1685 public PTransform<PCollection<V>, PDone> buildExternal(
1686 KafkaValueWrite.External.Configuration configuration) {
1687
1688 Map<String, Object> producerConfig = new HashMap<>();
1689 for (KV<String, String> kv : configuration.producerConfig) {
1690 producerConfig.put(kv.getKey(), kv.getValue());
1691 }
1692
1693 Class valSerializer = resolveClass(configuration.valueSerializer);
1694
1695 Write<K, V> writeTransform =
1696 KafkaIO.<K, V>write()
1697 .withProducerConfigUpdates(producerConfig)
1698 .withKeySerializer(valSerializer)
1699 .withValueSerializer(valSerializer)
1700 .withTopic(configuration.topic);
1701
1702 return new KafkaIO.KafkaValueWrite<K, V>(writeTransform);
1703 }
1704 }
1705
1706 /** Exposes {@link KafkaIO.Write} as an external transform for cross-language usage. */
1707 @Experimental(Kind.PORTABILITY)
1708 @AutoService(ExternalTransformRegistrar.class)
1709 public static class External implements ExternalTransformRegistrar {
1710
1711 public static final String URN = "beam:external:java:kafka:write_values:v1";
1712
1713 @Override
1714 public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
1715 return ImmutableMap.of(URN, KafkaValueWrite.Builder.class);
1716 }
1717
1718 /** Parameters class to expose the Write transform to an external SDK. */
1719 public static class Configuration {
1720
1721 // All byte arrays are UTF-8 encoded strings
1722 private Iterable<KV<String, String>> producerConfig;
1723 private String topic;
1724 private String valueSerializer;
1725
1726 public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
1727 this.producerConfig = producerConfig;
1728 }
1729
1730 public void setTopic(String topic) {
1731 this.topic = topic;
1732 }
1733
1734 public void setValueSerializer(String valueSerializer) {
1735 this.valueSerializer = valueSerializer;
1736 }
1737 }
1738 }
1739 }
1740
1741
1742 private static class NullOnlyCoder<T> extends AtomicCoder<T> {
1743 @Override
1744 public void encode(T value, OutputStream outStream) {
1745 checkArgument(value == null, "Can only encode nulls");
1746 // Encode as no bytes.
1747 }
1748
1749 @Override
1750 public T decode(InputStream inStream) {
1751 return null;
1752 }
1753 }
1754
1755 private static Class resolveClass(String className) {
1756 try {
1757 return Class.forName(className);
1758 } catch (ClassNotFoundException e) {
1759 throw new RuntimeException("Could not find class: " + className);
1760 }
1761 }
1762}