Kafka stream Introduction

Deepti Mittal
3 min readAug 15, 2020

--

Kafka Streams is a client library for processing and analysing data stored in Kafka and either write the resulting data back to Kafka or send the final output to an external system.

Has no external dependencies on systems other than Apache Kafka itself as the internal messaging layer; notably, it uses Kafka’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees.

Offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API.

Core concepts

Data stream is like a river, you don’t see start and end. So stream is unbounded flow of data.

A stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).

A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.

There are two special processors in the topology:

Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.

Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.

Processor topology is an abstraction of stream processing node. At runtime, topology is instantiated and replicated to process stream data in parallel, without us doing anything.You can think of it as instructions to be executed when data is read from topic.

Kafka stream takes care of running those instructions in parallel and it saves huge effort of developers writing code to process data in parallel.

State store

Some stream processing application will require state to be maintained while processing messages so that it influences other processing.

Kafka stream provides state store, which can be used by stream processing application to store and query data. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing.

Sample code

Kafka stream word count example

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApplication {

public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}

}

More great code samples can be found at:

--

--

Deepti Mittal
Deepti Mittal

Written by Deepti Mittal

I am working as software engineer in Bangalore for over a decade. Love to solve core technical problem and then blog about it.

No responses yet