Book Image

Apache Kafka Quick Start Guide

By : Raúl Estrada
Book Image

Apache Kafka Quick Start Guide

By: Raúl Estrada

Overview of this book

Apache Kafka is a great open source platform for handling your real-time data pipeline to ensure high-speed filtering and pattern matching on the ?y. In this book, you will learn how to use Apache Kafka for efficient processing of distributed applications and will get familiar with solving everyday problems in fast data and processing pipelines. This book focuses on programming rather than the configuration management of Kafka clusters or DevOps. It starts off with the installation and setting up the development environment, before quickly moving on to performing fundamental messaging operations such as validation and enrichment. Here you will learn about message composition with pure Kafka API and Kafka Streams. You will look into the transformation of messages in different formats, such asext, binary, XML, JSON, and AVRO. Next, you will learn how to expose the schemas contained in Kafka with the Schema Registry. You will then learn how to work with all relevant connectors with Kafka Connect. While working with Kafka Streams, you will perform various interesting operations on streams, such as windowing, joins, and aggregations. Finally, through KSQL, you will learn how to retrieve, insert, modify, and delete data streams, and how to manipulate watermarks and windows.
Table of Contents (10 chapters)

Reading from Kafka

Now that we have our project skeleton, let's recall the project requirements for the stream processing engine. Remember that our event customer consults ETH price occurs outside Monedero and that these messages may not be well formed, that is, they may have defects. The first step in our pipeline is to validate that the input events have the correct data and the correct structure. Our project will be called ProcessingEngine.

The ProcessingEngine specification shall create a pipeline application that does the following:

  • Reads each message from a Kafka topic called input-messages
  • Validates each message, sending any invalid event to a specific Kafka topic called invalid-messages
  • Writes the correct messages in a Kafka topic called valid-messages

These steps are detailed in Figure 2.1, the first sketch for the pipeline processing engine:

Figure 2.1: The processing engine reads events from the input-messages topic, validates the messages, and routes the defective ones to invalid-messages topic and the correct ones to valid-messages topic

The processing engine stream construction has two phases:

  • Create a simple Kafka worker that reads from the input-messages topic in Kafka and writes the events to another topic
  • Modify the Kafka worker to make the validation

So, let's proceed with the first step. Build a Kafka worker that reads individual raw messages from the input-messages topic. We say in the Kafka jargon that a consumer is needed. If you recall, in the first chapter we built a command-line producer to write events to a topic and a command-line consumer to read the events from that topic. Now, we will code the same consumer in Java.

For our project, a consumer is a Java interface that contains all of the necessary behavior for all classes that implement consumers.

Create a file called Consumer.java in the src/main/java/monedero/ directory with the content of Listing 2.4:

package monedero;
import java.util.Properties;
public interface Consumer {
static Properties createConfig(String servers, String groupId) {
Properties config = new Properties();
config.put("bootstrap.servers", servers);
config.put("group.id", groupId);
config.put("enable.auto.commit", "true");
config.put("auto.commit.interval.ms", "1000");
config.put("auto.offset.reset", "earliest");
config.put("session.timeout.ms", "30000");
config.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return config;
}
}
Listing 2.4: Consumer.java

The consumer interface encapsulates the common behavior of the Kafka consumers. The consumer interface has the createConfig method that sets all of the properties needed by all of the Kafka consumers. Note that the deserializers are of the StringDeserializer type because the Kafka consumer reads Kafka key-value records where the value are of the type string.

Now, create a file called Reader.java in the src/main/java/monedero/ directory with the content of Listing 2.5:

package monedero;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
class Reader implements Consumer {
private final KafkaConsumer<String, String> consumer;//1
private final String topic;
Reader(String servers, String groupId, String topic) {
this.consumer =
new KafkaConsumer<>(Consumer.createConfig(servers, groupId));
this.topic = topic;
}
void run(Producer producer) {
this.consumer.subscribe(Collections.singletonList(this.topic));//2
while (true) {//3
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); //4
for (ConsumerRecord<String, String> record : records) {
producer.process(record.value());//5
}
}
}
}
Listing 2.5: Reader.java

The Reader class implements the consumer interface. So, Reader is a Kafka consumer:

  • In line //1, <String, String> says that KafkaConsumer reads Kafka records where the key and value are both of the type string
  • In line //2, the consumer subscribes to the Kafka topic specified in its constructor
  • In line //3, there is a while(true) infinite loop for demonstrative purposes; in practice, we need to deal with more robust code maybe, implementing Runnable
  • In line //4, this consumer will be pooling data from the specified topics every 100 milliseconds
  • In line //5, the consumer sends the message to be processed by the producer

This consumer reads all of the messages from the specified Kafka topic and sends them to the process method of the specified producer. All of the configuration properties are specified in the consumer interface, but specifically the groupId property is important because it associates the consumer with a specific consumer group.

The consumer group is useful when we need to share the topic's events across all of the group's members. Consumer groups are also used to group or isolate different instances.

To read more about the Kafka Consumer API, follow this link: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html/