Introduction to Apache Kafka with Spring: Step by Step initial setup guide
Introduction to Apache Kafka with Spring: Step by Step initial setup guide

Apache Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It is highly scalable, fault-tolerant, and provides a robust mechanism for handling large streams of events. Integrating Kafka with Spring Boot makes it easier to develop event-driven microservices. This guide will take you through the steps to set up and use Kafka with Spring, complete with a sequence diagram, class diagram, and a comprehensive setup guide.

Table of Contents

  1. Introduction to Apache Kafka
  2. Why Use Apache Kafka with Spring?
  3. Setting Up the Environment
  4. Spring Kafka Configuration
  5. Producing Messages to Kafka
  6. Consuming Messages from Kafka
  7. Sequence Diagram
  8. Class Diagram
  9. Complete Setup Guide
  10. Conclusion

1. Introduction to Apache Kafka

Apache Kafka is a high-throughput, low-latency platform for handling real-time data feeds. Kafka is used for building real-time streaming data pipelines that reliably get data between systems or applications.

Key Components:

  • Producer: Sends records to a Kafka topic.
  • Consumer: Reads records from a Kafka topic.
  • Broker: Kafka server that stores data and serves clients.
  • Topic: Category to which records are sent by producers.
  • Partition: Subset of a topic for parallelism and scaling.

2. Why Use Apache Kafka with Spring?

Spring Boot provides extensive support for Kafka, making it easier to integrate Kafka into your applications. Some of the benefits include:

  • Simplified configuration and setup.
  • Built-in error handling and retries.
  • Easily scalable and manageable consumer/producer configurations.
  • Integration with Spring ecosystem (Spring Cloud, Spring Data, etc.).

3. Setting Up the Environment

Prerequisites:

  • Java Development Kit (JDK): Ensure you have JDK 8 or later installed.
  • Apache Kafka: Download and install Kafka from the official website.

Installing and Running Kafka:

Download and extract Kafka:shCopy codewget https://downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz tar -xzf kafka_2.12-2.7.0.tgz cd kafka_2.12-2.7.0

Start the Zookeeper server:shCopy codebin/zookeeper-server-start.sh config/zookeeper.properties

Start the Kafka server:shCopy codebin/kafka-server-start.sh config/server.properties

4. Spring Kafka Configuration

Adding Dependencies

Add the necessary dependencies to your pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

Configuring Kafka in Spring Boot

Create a configuration class for Kafka:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        configProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

5. Producing Messages to Kafka

Create a service to produce messages to Kafka:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

6. Consuming Messages from Kafka

Create a consumer to read messages from Kafka:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "your-topic", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println("Received message: " + record.value());
        acknowledgment.acknowledge();
    }
}

7. Sequence Diagram

Below is a sequence diagram representing the flow of messages from producer to consumer in a Kafka setup with Spring.

codeProducer -> Kafka Topic -> Consumer
  1. Producer sends a message to the Kafka Topic.
  2. Kafka Topic stores the message and partitions it.
  3. Consumer subscribes to the Kafka Topic and receives the message.

8. Class Diagram

Here’s a class diagram for the Kafka setup in Spring Boot.


+-----------------+ +---------------------+ +------------------+
| KafkaProducer | | KafkaConfig | | KafkaConsumer |
+-----------------+ +---------------------+ +------------------+
| - kafkaTemplate | | + producerFactory() | | + listen() |
| + sendMessage() | <--> | + kafkaTemplate() | <--> | + acknowledge() |
+-----------------+ | + consumerFactory() | +------------------+
| + kafkaListener |
| ContainerFactory()|
+---------------------+

9. Complete Setup Guide

Step-by-Step Implementation

  1. Create a Spring Boot Project:
    • Use Spring Initializr or your IDE to create a new Spring Boot project with dependencies for Spring Kafka.
  2. Add Dependencies:
    • Add the necessary dependencies in the pom.xml as shown above.
  3. Configure Kafka:
    • Create a configuration class KafkaConfig to set up producer and consumer factories, and Kafka listener container factory.
  4. Produce Messages:
    • Create a KafkaProducer service to send messages to Kafka topics.
  5. Consume Messages:
    • Create a KafkaConsumer service to listen to messages from Kafka topics.
  6. Run the Application:
    • Ensure Kafka and Zookeeper are running.
    • Start your Spring Boot application.
    • Produce a message and observe the consumer receiving it.

Complete Code

// KafkaConfig.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        configProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

// KafkaProducer.java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

// KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "your-topic", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println("Received message: " + record.value());
        acknowledgment.acknowledge();
    }
}

// Main Application Class
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }
}

Running the Application

  1. Run Kafka and Zookeeper:
    • Start the Zookeeper and Kafka servers as described in the setup section.
  2. Produce a Message:
    • Use the KafkaProducer service to send a message to a topic.
  3. Consume a Message:
    • Observe the logs or console to see the KafkaConsumer service consuming the message.

10. Conclusion

Integrating Apache Kafka with Spring Boot provides a powerful and scalable way to handle real-time data streams. By following this comprehensive guide, you can set up a Kafka-based application in Spring, enabling you to produce and consume messages efficiently.

Leave a Reply

Your email address will not be published. Required fields are marked *