Apache Kafka is a distributed streaming platform that excels at managing real-time data feeds. In this blog, we’ll walk through the steps to configure Kafka such that multiple publishers can send messages to a single topic and every consumer receives the same messages. We’ll cover Kafka setup, configuration, and provide a complete example with code. Additionally, we’ll explain why having separate consumers and topics might be beneficial in certain scenarios. Let’s write code for multiple Kafka Consumers Receive Same Message By Publishers.
Overview
Kafka topics can be configured to ensure that each consumer receives the same message. This is particularly useful in scenarios where multiple services (publishers) generate events that need to be processed by multiple consumers independently. We will:
- Set up a Kafka topic.
- Configure multiple publishers to send messages to this topic.
- Configure consumers to ensure each one receives every message published to the topic.
- Test the setup to ensure everything is working as expected.
Kafka Setup
Prerequisites
- Apache Kafka installed and running.
- Apache Zookeeper installed and running (used by Kafka to manage its cluster state).
Step 1: Start Zookeeper and Kafka
Start Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka:
bin/kafka-server-start.sh config/server.properties
Step 2: Create a Topic
Create a Kafka topic named my-topic
with a replication factor of 1 and a single partition. This setup ensures all messages go to one partition and are seen by all consumers.
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Configuring Multiple Publishers
Here is the Java code to configure multiple publishers (producers) that send messages to the my-topic
.
Publisher Code
Publisher1.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Publisher1 {
public static void main(String[] args) {
String topicName = "my-topic";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "Key" + i;
String value = "Publisher1-Message" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer.send(record);
System.out.println("Publisher1 sent message: " + key + ", " + value);
}
producer.close();
}
}
Publisher2.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Publisher2 {
public static void main(String[] args) {
String topicName = "my-topic";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "Key" + i;
String value = "Publisher2-Message" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer.send(record);
System.out.println("Publisher2 sent message: " + key + ", " + value);
}
producer.close();
}
}
Configuring Consumers
Each consumer needs to be part of a unique consumer group to ensure they receive every message.
Consumer Code
Consumer1.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer1 {
public static void main(String[] args) {
String topicName = "my-topic";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer1 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
Consumer2.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer2 {
public static void main(String[] args) {
String topicName = "my-topic";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-2");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer2 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
Testing the Setup
Step 3: Run Publishers
Run Publisher1
and Publisher2
to send messages to my-topic
.
javac Publisher1.java
java Publisher1
javac Publisher2.java
java Publisher2
Step 4: Run Consumers
Run Consumer1
and Consumer2
to consume messages from my-topic
.
javac Consumer1.java
java Consumer1
javac Consumer2.java
java Consumer2
You should see that both consumers receive all messages published by both publishers.
Details with Example
Multiple Producers Scenario
In this setup, we have two producers, Publisher1
and Publisher2
, that send messages to the same Kafka topic my-topic
.
// Publisher1 sends messages to my-topic
for (int i = 0; i < 10; i++) {
String key = "Key" + i;
String value = "Publisher1-Message" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer1.send(record);
System.out.println("Publisher1 sent message: " + key + ", " + value);
}
// Publisher2 sends messages to my-topic
for (int i = 0; i < 10; i++) {
String key = "Key" + i;
String value = "Publisher2-Message" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer2.send(record);
System.out.println("Publisher2 sent message: " + key + ", " + value);
}
Multiple Consumers Scenario
In this setup, we have two consumers, Consumer1
and Consumer2
, that read messages from the same Kafka topic my-topic
. Each consumer is part of a different consumer group.
// Consumer1 reading messages from my-topic
while (true) {
ConsumerRecords<String, String> records = consumer1.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer1 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
// Consumer2 reading messages from my-topic
while (true) {
ConsumerRecords<String, String> records = consumer2.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer2 received message: Key = %s, Value = %s, Partition = %d, Offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
Why Separate Consumer Groups Are Needed
Using separate consumer groups for each consumer ensures that every consumer receives every message. If consumers were part of the same consumer group, Kafka would distribute the messages among them, meaning each consumer would only get a subset of the messages. Separate consumer groups are beneficial for:
- Parallel Processing: Different consumers in separate consumer groups can process messages in parallel, enhancing throughput and performance.
- Different Processing Logic: Separate consumers allow for different processing logic to be applied to the same set of messages. For instance, one consumer might filter messages while another aggregates them.
- Scalability: As data volume grows, having multiple consumers in different consumer groups can help distribute the processing load more evenly.
- Fault Isolation: If one consumer fails or experiences delays, it doesn’t affect the others. Each consumer can be managed and scaled independently.
- Optimized Resource Usage: Different consumers can run on different hardware, optimizing resource usage based on the processing requirements.
How It Works
- Producers: Both
Publisher1
andPublisher2
send messages to the Kafka topicmy-topic
. - Topic: All messages are stored in a single partition of the topic
my-topic
. - Consumers:
Consumer1
andConsumer2
read messages from the topic. Since they are in different consumer groups, both receive all messages published to the topic.
Conclusion
By following these steps, you can set up Kafka to allow multiple publishers to send messages to a single topic and ensure that every consumer receives all messages. This configuration is useful for scenarios where multiple services generate events that need to be processed by multiple consumers independently.