Integration tests: Early feedback with kafka

Deepti Mittal
3 min readSep 26, 2020

In this blog I am referring component level integration tests. In testing framework integration tests is something which is above unit testing and can reduce huge load on E2E testing.

Component level automated integration tests can be very beneficial to give early feedback.

My last 3 projects have been using integration tests extensively to find issues early with every code changes as it runs with every build pipeline.

Integration tests will be self sufficient to bring up any external dependencies like DB, Kafka broker etc and at the end of test case/test suite all the dependencies should be down.

Below are the steps to write integration test cases using Spring Kafka embedded broker.

Add gradle dependencies:

testCompile 'org.springframework.kafka:spring-kafka-test:2.5.5.RELEASE'
testCompile 'org.apache.kafka:kafka_2.12:2.5.1:test'
testCompile 'org.apache.kafka:kafka_2.12:2.5.1'

To start broker for integration test cases we can use @EmbeddedBroker annotation. This takes some properties like:

  • Broker address to start broker
  • Topics to create
  • Partitons to create in topics
  • Number of brokers to create in case you want to get random ports for brokers.

There are few more realted to zookeeper.

@EmbeddedKafka(
brokerProperties = {"listeners=PLAINTEXT://${kafka.connection.bootstrapServer}",
"listeners=PLAINTEXT://${kafka.connection.bootstrapServer}",
"delete.topic.enable=${topic.delete:true"},
topics ={"${kafka.topic.baselineOrderTopic}", "${kafka.topic.stockShareOrderTopic}"},
partitions = 1,
count=2
)

To access broker properties or perform other function we can auto wire broker depenency like below:

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

Configure serdes

Now is you are using Avro with schema registry don’t forget to configure serdes with schema registry URL.

You can define mock schema registry URL like:

schemaRegistryUrl: mock://test

Having mock in the URL will not connect to schema registry while getting schema for sending messages and now just configure the URL. Below is the sample code:

private String schemaRegistryUrl = "mock://test";
Serde<String> stringSerde = Serdes.String();
Serde<User> userSerde = new SpecificAvroSerde<>();
Map<String, Object> config = new HashMap<>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

stringSerde.configure(config, true);
userSerde.configure(config, false);

Writing producer

There are 2 ways to write producer one is using Apache Kafka producer and other is using Spring Producer factory. I found spring producer factory to be easy to use as it’s very easy and less line of code.

Map<String, Object> producerConfigs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker)); 
producerConfigs.put("key.serializer", stringSerde.serializer());
producerConfigs.put("value.serializer", userSerde.serializer());
producerConfigs.put("schema.registry.url", schemaRegistryUrl);
baselineOrderProducer = new DefaultKafkaProducerFactory<>(
producerConfigs,
stringSerde.serializer(),
userSerde.serializer())
.createProducer();

Writing consumer

Same way either you can use kafka consumer or use Spring Kafka consumer Factory.

Map<String, Object> consumerConfigs = new HashMap<>(KafkaTestUtils.consumerProps("consumer1", "false", embeddedKafkaBroker));
consumerConfigs.put("key.deserializer", stringSerde.deserializer());
consumerConfigs.put("value.deserializer", userSerdes.deserializer());
consumer = new DefaultKafkaConsumerFactory<>(consumerConfigs, stringSerde.deserializer(), userSerdes.deserializer()).createConsumer();
consumer.subscribe(singleton(userTopic));

Publishing and consuming messages

User user = new User("John", 10, "4");
producer.send(new ProducerRecord(userTopic, user.getName(), user ));
producer.flush();

ConsumerRecord<String, User> userRecord = KafkaTestUtils.getSingleRecord(consumer, userTopic);
assertThat(userRecord).isNotNull();

Closing the broker

Closing the broker after every test case might be very costly in terms of time so preference if to close it after all test cases have run.

If you close the broker in @AfterAll annotation then you have to make it static. To avoid that JUnit5 introduced @TestInstance annotation which can be used at class level :

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class UserIntegrationTest {
@AfterAll
public void teardown() {
consumer.unsubscribe();
producer.close();
consumer.close();
embeddedKafkaBroker.destroy();
}

I found this approach very simple to write integration test for Kafka and has been very helpful to test things without putting time to set up and start actual broker every time.

Git Sample

I have implemented all the steps @

--

--

Deepti Mittal

I am working as software engineer in Bangalore for over a decade. Love to solve core technical problem and then blog about it.