Multiple Kafka Consumers Receive Same Message By Publishers
Multiple Kafka Consumers Receive Same Message By Publishers

Apache Kafka is a distributed streaming platform that excels at managing real-time data feeds. In this blog, we’ll walk through the steps to configure Kafka such that multiple publishers can send messages to a single topic and every consumer receives the same messages. We’ll cover Kafka setup, configuration, and provide a complete example with code. Additionally, we’ll explain why having separate consumers and topics might be beneficial in certain scenarios. Let’s write code for multiple Kafka Consumers Receive Same Message By Publishers.

Overview

Kafka topics can be configured to ensure that each consumer receives the same message. This is particularly useful in scenarios where multiple services (publishers) generate events that need to be processed by multiple consumers independently. We will:

  1. Set up a Kafka topic.
  2. Configure multiple publishers to send messages to this topic.
  3. Configure consumers to ensure each one receives every message published to the topic.
  4. Test the setup to ensure everything is working as expected.

Kafka Setup

Prerequisites

  • Apache Kafka installed and running.
  • Apache Zookeeper installed and running (used by Kafka to manage its cluster state).

Step 1: Start Zookeeper and Kafka

Start Zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka:

bin/kafka-server-start.sh config/server.properties

Step 2: Create a Topic

Create a Kafka topic named my-topic with a replication factor of 1 and a single partition. This setup ensures all messages go to one partition and are seen by all consumers.

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Configuring Multiple Publishers

Here is the Java code to configure multiple publishers (producers) that send messages to the my-topic.

Publisher Code

Publisher1.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Publisher1 {
    public static void main(String[] args) {
        String topicName = "my-topic";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            String key = "Key" + i;
            String value = "Publisher1-Message" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
            producer.send(record);
            System.out.println("Publisher1 sent message: " + key + ", " + value);
        }

        producer.close();
    }
}

Publisher2.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Publisher2 {
    public static void main(String[] args) {
        String topicName = "my-topic";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            String key = "Key" + i;
            String value = "Publisher2-Message" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
            producer.send(record);
            System.out.println("Publisher2 sent message: " + key + ", " + value);
        }

        producer.close();
    }
}

Configuring Consumers

Each consumer needs to be part of a unique consumer group to ensure they receive every message.

Consumer Code

Consumer1.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer1 {
    public static void main(String[] args) {
        String topicName = "my-topic";
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Consumer1 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}

Consumer2.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer2 {
    public static void main(String[] args) {
        String topicName = "my-topic";
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-2");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Consumer2 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}

Testing the Setup

Step 3: Run Publishers

Run Publisher1 and Publisher2 to send messages to my-topic.

javac Publisher1.java
java Publisher1
javac Publisher2.java
java Publisher2

Step 4: Run Consumers

Run Consumer1 and Consumer2 to consume messages from my-topic.

javac Consumer1.java
java Consumer1
javac Consumer2.java
java Consumer2

You should see that both consumers receive all messages published by both publishers.

Details with Example

Multiple Producers Scenario

In this setup, we have two producers, Publisher1 and Publisher2, that send messages to the same Kafka topic my-topic.

// Publisher1 sends messages to my-topic
for (int i = 0; i < 10; i++) {
    String key = "Key" + i;
    String value = "Publisher1-Message" + i;
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
    producer1.send(record);
    System.out.println("Publisher1 sent message: " + key + ", " + value);
}
// Publisher2 sends messages to my-topic
for (int i = 0; i < 10; i++) {
    String key = "Key" + i;
    String value = "Publisher2-Message" + i;
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
    producer2.send(record);
    System.out.println("Publisher2 sent message: " + key + ", " + value);
}

Multiple Consumers Scenario

In this setup, we have two consumers, Consumer1 and Consumer2, that read messages from the same Kafka topic my-topic. Each consumer is part of a different consumer group.

// Consumer1 reading messages from my-topic
while (true) {
    ConsumerRecords<String, String> records = consumer1.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Consumer1 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
                record.key(), record.value(), record.partition(), record.offset());
    }
}
// Consumer2 reading messages from my-topic
while (true) {
    ConsumerRecords<String, String> records = consumer2.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Consumer2 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
                record.key(), record.value(), record.partition(), record.offset());
    }
}

Why Separate Consumer Groups Are Needed

Using separate consumer groups for each consumer ensures that every consumer receives every message. If consumers were part of the same consumer group, Kafka would distribute the messages among them, meaning each consumer would only get a subset of the messages. Separate consumer groups are beneficial for:

  1. Parallel Processing: Different consumers in separate consumer groups can process messages in parallel, enhancing throughput and performance.
  2. Different Processing Logic: Separate consumers allow for different processing logic to be applied to the same set of messages. For instance, one consumer might filter messages while another aggregates them.
  3. Scalability: As data volume grows, having multiple consumers in different consumer groups can help distribute the processing load more evenly.
  4. Fault Isolation: If one consumer fails or experiences delays, it doesn’t affect the others. Each consumer can be managed and scaled independently.
  5. Optimized Resource Usage: Different consumers can run on different hardware, optimizing resource usage based on the processing requirements.

How It Works

  1. Producers: Both Publisher1 and Publisher2 send messages to the Kafka topic my-topic.
  2. Topic: All messages are stored in a single partition of the topic my-topic.
  3. Consumers: Consumer1 and Consumer2 read messages from the topic. Since they are in different consumer groups, both receive all messages published to the topic.

Conclusion

By following these steps, you can set up Kafka to allow multiple publishers to send messages to a single topic and ensure that every consumer receives all messages. This configuration is useful for scenarios where multiple services generate events that need to be processed by multiple consumers independently.

References

  1. Apache Kafka Documentation
  2. Kafka Java Client API

Leave a Reply

Your email address will not be published. Required fields are marked *