Book Image

Apache Kafka Quick Start Guide

By : Raúl Estrada
Book Image

Apache Kafka Quick Start Guide

By: Raúl Estrada

Overview of this book

Apache Kafka is a great open source platform for handling your real-time data pipeline to ensure high-speed filtering and pattern matching on the ?y. In this book, you will learn how to use Apache Kafka for efficient processing of distributed applications and will get familiar with solving everyday problems in fast data and processing pipelines. This book focuses on programming rather than the configuration management of Kafka clusters or DevOps. It starts off with the installation and setting up the development environment, before quickly moving on to performing fundamental messaging operations such as validation and enrichment. Here you will learn about message composition with pure Kafka API and Kafka Streams. You will look into the transformation of messages in different formats, such asext, binary, XML, JSON, and AVRO. Next, you will learn how to expose the schemas contained in Kafka with the Schema Registry. You will then learn how to work with all relevant connectors with Kafka Connect. While working with Kafka Streams, you will perform various interesting operations on streams, such as windowing, joins, and aggregations. Finally, through KSQL, you will learn how to retrieve, insert, modify, and delete data streams, and how to manipulate watermarks and windows.
Table of Contents (10 chapters)

A command-line message consumer

The last step is how to read the generated messages. Kafka also has a powerful command that enables messages to be consumed from the command line. Remember that all of these command-line tasks can also be done programmatically. As the producer, each line in the input is considered a message from the producer.

For this section, the execution of the previous steps is needed. The Kafka brokers must be up and running and a topic created inside them. Also, some messages need to be produced with the message console producer, to begin consuming these messages from the console.

Run the following command:

> <confluent-path>/bin/kafka-console-consumer --topic amazingTopic --bootstrap-server localhost:9093 --from-beginning

The output should be as follows:

Fool me once shame on you
Fool me twice shame on me

The parameters are the topic's name and the name of the broker producer. Also, the --from-beginning parameter indicates that messages should be consumed from the beginning instead of the last messages in the log (now test it, generate many more messages, and don't specify this parameter).

There are more useful parameters for this command, some important ones are as follows:

  • --fetch-size: This is the amount of data to be fetched in a single request. The size in bytes follows as argument. The default value is 1,024 x 1,024.
  • --socket-buffer-size: This is the size of the TCP RECV. The size in bytes follows this parameter. The default value is 2 x 1024 x 1024.
  • --formater: This is the name of the class to use for formatting messages for display. The default value is NewlineMessageFormatter.
  • --autocommit.interval.ms: This is the time interval at which to save the current offset in milliseconds. The time in milliseconds follows as argument. The default value is 10,000.
  • --max-messages: This is the maximum number of messages to consume before exiting. If not set, the consumption is continuous. The number of messages follows as the argument.
  • --skip-message-on-error: If there is an error while processing a message, the system should skip it instead of halting.

The most requested forms of this command are as follows:

  • To consume just one message, use the following:
      > <confluent-path>/bin/kafka-console-consumer --topic 
amazingTopic --
bootstrap-server localhost:9093 --max-messages 1

  • To consume one message from an offset, use the following:
      > <confluent-path>/bin/kafka-console-consumer --topic 
amazingTopic --
bootstrap-server localhost:9093 --max-messages 1 --formatter
'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter'
  • To consume messages from a specific consumer group, use the following:
      <confluent-path>/bin/kafka-console-consumer –topic amazingTopic -
- bootstrap-server localhost:9093 --new-consumer --consumer-
property
group.id=my-group