Kafka error handling: Decide the expectation and action

Deepti Mittal
3 min readDec 25, 2021

Kafka can be configured to read records in batches and as well as single, which also requires to have error handling accordingly.

In a batch of 10000 records, if one of the record fails, Kafka gives an option to either fail entire batch or just that one record. Depending on the use cases, either you can choose to go with default error handlers provided by Spring Kafka or custom error handlers can also be configured.

Some of default error handlers provided by Spring Kafka:

  • RetryingBatchErrorHandler: Retries batch by number of retries in case of errors after retry interval provided. If retries are exhausted consumer record recoverer is called to execute the action decided. If retries are interrupted the batch is passed to next consumer.
  • SeekToCurrentBatchErrorHandler: If a batch fails it will set the offset to current so that batch can be replayed. Similarly, SeekToCurrentErrorHandler is for single record processing.
  • RecoveringBatchErrorHandler: If an error occur in a batch, the records before the particular records will have their offsets committed and the partitions for the remaining records will be repositioned and/or the failed record can be recovered and skipped. If valid record is not provided, error handling is delegated to SeekToCurrentBatchErrorHandler. If record is recovered after that, then offset will be committed
  • BatchLoggingErrorHandler: Just logs the error for every record. Similarly, LoggingErrorHandler is for single record processing.
  • ContainerStoppingBatchErrorHandler: Stops the container if an error occurs. Similarly, ContainerStoppingErrorHandler is for single record processing.

Based on use case first thing we should decide which error handler to use or should custom error handlers needs to be defined.

Setting up error handler to be used

Error handler needs to be configured while configuring listener factory, so in case of error, listener knows how to talk to.. Below code snippet shows how to do that.

ConcurrentKafkaListenerContainerFactory<SpecificRecord, SpecificRecord> factory =new ConcurrentKafkaListenerContainerFactory<>();// Deciding whether to commit offset for every record or whole batch.factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);factory.setBatchListener(true); // By default is false.// How may listeners to create to have parallelism in processing multiple batch together.factory.setConcurrency(Integer.parseInt(concurrency));// As batch listener is true so error handler also needs to be set for batch.factory.setBatchErrorHandler(errorHandler);// Defining properties for consumers.DefaultKafkaConsumerFactory<SpecificRecord, SpecificRecord> consumerFactory = getKafkaConsumerFactory();factory.setConsumerFactory(consumerFactory);private DefaultKafkaConsumerFactory<SpecificRecord, SpecificRecord> getKafkaConsumerFactory() {Map<String, Object> props = kafkaProperties.buildConsumerProperties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class);props.put("schema.registry.url", kafkaProperties.getProperties().get("schema.registry.url"));return new DefaultKafkaConsumerFactory<>(props);}

Error Handler passed could be standard or customised error handler.

Custom action to recover from error

Another important thing if using retry error handlers is to decide what action to be taken if record is unsuccessful. Default behaviour provided by error handler is to log exception and commit offset. This might not be right behavior for all recoverable and non recoverable exceptions. Defining below bean will help to inject your behaviour instead of falling back to default behaviour.

@Beanpublic ConsumerRecordRecoverer recordRecoverer(ApplicationContext context) {return (consumerRecord, exception) -> {//Define action};}

Error handling and transaction management is one of the most beautiful code which I have written in Kafka. Always try to disconnect this code from specific use case. Only 4 things should be considered while designing error handling in system.

  1. What should be the behaviour of recoverable exceptions
  2. What should be the behaviour of non-recoverable exceptions
  3. What should be the behaviour in case of error in batch processing
  4. What should be the behaviour of single record processing

Deciding this behaviour if very crucial and keeping error handling code just around these 4 things will take care of lot of errors in production by its own.

My blog on recoverable and non recoverable error handling:

--

--

Deepti Mittal

I am working as software engineer in Bangalore for over a decade. Love to solve core technical problem and then blog about it.