Scalability in Apache Kafka Horizontal Scaling Explained
Scalability in Apache Kafka Horizontal Scaling Explained

What is Scalability?

Scalability refers to the ability of a system to handle a growing amount of work or its potential to accommodate growth. In the context of Apache Kafka, scalability primarily involves horizontal scaling, where additional Kafka brokers are added to a cluster to increase its capacity and throughput.

How Kafka Achieves Horizontal Scalability

Kafka’s architecture is inherently designed to support horizontal scaling. The key components that facilitate this scalability include topics, partitions, brokers, and consumer groups. Here’s how Kafka achieves horizontal scalability:

  1. Topics and Partitions:
    • Kafka divides data into topics, which are further subdivided into partitions. Each partition is a log that can be handled independently.
    • Partitions allow Kafka to parallelize data processing across multiple brokers. Each broker can handle multiple partitions, enabling the distribution of data and load.
  2. Brokers:
    • A Kafka cluster consists of multiple brokers. Each broker is responsible for handling read and write requests for the partitions it owns.
    • By adding more brokers to a Kafka cluster, the number of partitions and the overall capacity for handling data can be increased.
  3. Consumer Groups:
    • Consumers in Kafka are organized into consumer groups. Each partition of a topic is consumed by exactly one consumer within a group.
    • This allows Kafka to balance the load of data processing across multiple consumers, facilitating parallel consumption.

Example of Horizontal Scalability in Kafka

Consider a scenario where an e-commerce platform uses Kafka to handle user activity tracking. As the number of users grows, the volume of activity data increases. Here’s how Kafka can be scaled horizontally to manage this growth:

Initial Setup:

  • Cluster: A Kafka cluster with 3 brokers.
  • Topic: A topic named user-activity with 6 partitions.

Initial Data Flow:

  • Producers send user activity events to the user-activity topic.
  • Each broker handles two partitions of the user-activity topic.
  • A consumer group with three consumers processes the events, with each consumer handling two partitions.

Scenario: Increased Load:

  • The platform’s user base doubles, resulting in a significant increase in activity events.
  • To handle the increased load, the Kafka cluster needs to scale.

Scaling Kafka Horizontally:

  1. Adding Brokers:
    • Add 3 more brokers to the Kafka cluster, making a total of 6 brokers.
    • Kafka’s partition reassignment tool is used to reassign partitions across the new brokers, ensuring an even distribution of partitions.
  2. Reassigning Partitions:
    • The user-activity topic’s partitions are reassigned to the new brokers.
    • Each broker now handles one partition of the user-activity topic, distributing the load evenly.
  3. Scaling Consumers:
    • Add more consumers to the consumer group to match the number of partitions.
    • Now, there are 6 consumers, with each consumer processing one partition.

Result:

  • With 6 brokers, each handling one partition of the user-activity topic, the cluster can handle higher throughput.
  • The consumer group with 6 consumers can process the increased volume of data in parallel, ensuring timely processing of user activity events.

Example Code for Kafka Producer and Consumer

Here’s a simplified example of a Kafka producer and consumer to illustrate the concept.

Kafka Producer:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class UserActivityProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 1000; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("user-activity", "key" + i, "activity" + i);
            producer.send(record);
        }
        producer.close();
    }
}

Kafka Consumer:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
import java.time.Duration;

public class UserActivityConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user-activity-group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("user-activity"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("key = %s, value = %s, partition = %d, offset = %d%n",
                                  record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}

Unit and Mock Testing Kafka

To ensure the reliability and correctness of Kafka applications, it’s essential to perform unit and mock testing. Below are examples of how to use the kafka-junit library for testing Kafka producers and consumers.

Unit Test for Kafka Producer:

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

public class UserActivityProducerTest {

    @Test
    public void testProducer() {
        MockProducer<String, String> producer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());

        UserActivityProducer.send(producer, "key1", "activity1");
        producer.completeNext();

        assertEquals(1, producer.history().size());
        ProducerRecord<String, String> record = producer.history().get(0);
        assertEquals("key1", record.key());
        assertEquals("activity1", record.value());
    }
}

Unit Test for Kafka Consumer:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;

public class UserActivityConsumerTest {

    @Test
    public void testConsumer() {
        MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        consumer.subscribe(Collections.singletonList("user-activity"));

        Map<String, Long> offsets = new HashMap<>();
        offsets.put("user-activity", 0L);
        consumer.updateBeginningOffsets(offsets);

        consumer.addRecord(new ConsumerRecord<>("user-activity", 0, 0L, "key1", "activity1"));

        UserActivityConsumer.consume(consumer);

        ConsumerRecord<String, String> record = consumer.poll(Duration.ofMillis(100)).iterator().next();
        assertEquals("key1", record.key());
        assertEquals("activity1", record.value());
    }
}

Benefits of Horizontal Scalability in Kafka

  1. Increased Throughput: By adding more brokers and partitions, Kafka can handle a higher volume of data.
  2. Fault Tolerance: More brokers mean better fault tolerance. If one broker fails, other brokers can take over its partitions.
  3. Load Balancing: Even distribution of data across brokers prevents any single broker from becoming a bottleneck.
  4. Parallel Processing: Multiple consumers can process data in parallel, ensuring faster data processing and reduced latency.

Conclusion

Kafka’s distributed architecture allows it to scale horizontally by adding more brokers to the cluster. This scalability ensures that Kafka can handle increasing loads efficiently, making it suitable for applications that require high throughput and low latency. By leveraging partitions, consumer groups, and efficient resource management, Kafka provides a robust and scalable solution for real-time data streaming and processing.

References

Leave a Reply

Your email address will not be published. Required fields are marked *