Simple Kafka application

Posted by Jean Arnaud on Saturday, January 25, 2025

Simple Kafka application

This page shows how to build a simple application with Apache Kafka. Our application will be a simple producer/consumer, which is a common use case for Kafka.

Architecture

We will simulate a market price feed system, where a producer generate some prices to be used by a consumer. The producer will generate random prices and push them to a Kafka topic. The consumer will poll these messages from the topic. Both will be Spring Boot applications and deployed as Docker containers.

Architecture

Services

Docker is convenient to deploy the different services of our app, here is the configuration in docker-compose.yml:

services:

  kafka0:
    image: apache/kafka:latest
    container_name: kafka0
    environment:
      KAFKA_ENABLE_KRAFT: yes
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9090
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka0:29093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    ports:
      - "9090:9092"

  consumer:
    image: consumer:1.0.0-SNAPSHOT
    container_name: consumer
    build:
      context: ./consumer
      dockerfile: Dockerfile
    ports:
      - "8081:8080"
    environment:
      KAFKA_BOOTSTRAP: "kafka0:29092"
      TICKER: "AMZN"
    depends_on:
      - kafka0

  producer:
    image: producer:1.0.0-SNAPSHOT
    container_name: producer
    build:
      context: ./producer
      dockerfile: Dockerfile
    ports:
      - "8082:8080"
    environment:
      KAFKA_BOOTSTRAP: "kafka0:29092"
      TICKER: "AMZN"
    depends_on:
      - kafka0

Kafka

First, Kafka can be deployed using the apache/kafka:latest Docker image. We allow access to the Kafka broker from outside Docker on localhost:9090 but also inside the Docker network (for the other services) on kafka0:29092.

Spring applications

Dependency

Kafka provides a Java client to perform all operations on topics:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.0</version>
</dependency>

Producer

Consumer and Producer are very similar and basic Spring Boot applications. We pass as parameter the Kafka broker bootstrap addresses and the information required to find the relevant topic.

The application is declared as follows:

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args).close();
    }
}
@Component
public class ProducerRunner implements CommandLineRunner {

    @Value("${KAFKA_BOOTSTRAP:localhost:9090}")
    private String kafkaBootstrap;

    @Value("${TICKER}")
    private String ticker;

    @Override
    public void run(String... args) {
        Producer producer = new Producer(kafkaBootstrap, ticker);
        producer.sendMessages(20);
    }
}
@Slf4j
public class Producer {

    private static final String TOPIC_PREFIX = "prices-";

    private final String kafkaBootstrap;
    private final String ticker;
    private final String topicName;

    /**
     * @param kafkaBootstrap the list of comma-separated addresses of Kafka bootstrap servers (from brokers list).
     * @param ticker         the product ticker.
     */
    public Producer(String kafkaBootstrap, String ticker) {

        if (ticker == null || ticker.isBlank()) {
            throw new RuntimeException("No ticker specified for price producer");
        }

        this.kafkaBootstrap = kafkaBootstrap;
        this.ticker = ticker.trim().toUpperCase();
        this.topicName = TOPIC_PREFIX + this.ticker;

        createTopic();
    }

    public void sendMessages(int nbMessages) {
        Properties properties = loadProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        Random random = new Random();
        try (producer) {
            for (int i = 0; i < nbMessages; i++) {
                sendMessage(producer, random);
                Thread.sleep(1000);
            }
        } catch (JsonProcessingException | InterruptedException e) {
            log.error(e.getMessage(), e);
        }
    }

    private void sendMessage(KafkaProducer<String, String> producer, Random random) throws JsonProcessingException {
        // Random price generation.
        double price = 100 + random.nextDouble(100);
        double volume = 1000 + random.nextInt(10000);

        ObjectMapper mapper = new ObjectMapper();
        ObjectNode node = mapper.createObjectNode();
        node.put("ticker", ticker);
        node.put("last", formatPrice(price));
        node.put("volume", formatPrice(volume));
        node.put("date", LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
        String msg = mapper.writer().writeValueAsString(node);


        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, msg);
        log.debug("Created record: {}", record);

        producer.send(record);
        producer.flush();
    }

    private String formatPrice(double price) {
        return String.format("%.2f", price);
    }

    private Properties loadProperties() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap); // Kafka brokers addresses.
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 5000);
        return props;
    }
}

In this basic implementation the producer will produce 20 messages (one per second), then terminate. Feel free to adapt this code to your needs.

Consumer

The consumer app is similar to the producer app, however the loadProperties() method should return a consumer specific configuration:

private Properties loadProperties() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap); // Kafka brokers addresses.
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "market-consumer-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //choose from earliest/latest/none
    return props;
}

and instead of producing messages, we want to receive them:

public void receiveMessages() {
    Properties properties = loadProperties();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    try (consumer) {
        consumer.subscribe(List.of(topicName));
        int iter = 120;
        while (iter-- > 0) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                log.info("[offset {}] Received record {}: {}", record.offset(), record.key(), record.value());
            }
        }
    }
}

This is a basic implementation will some active polling for around 2 minutes, then the consumer will terminate. Again feel free to adapt this code to your needs.

Launch the application

Start the application (docker compose up), you should see the Kafka image being downloaded, as well as the consumer and producer applications being built.

Check the logs of the producer, you should see the prices being sent:

2025-01-25 10:31:13 🐄  2025-01-25 09:31:13 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"165.10","volume":"6500.00","date":"2025-01-25T09:31:13.071582292"}, timestamp=null)
2025-01-25 10:31:14 🐄  2025-01-25 09:31:14 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"109.79","volume":"10435.00","date":"2025-01-25T09:31:14.076679208"}, timestamp=null)
...
2025-01-25 10:31:30 🐄  2025-01-25 09:31:30 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"132.49","volume":"10886.00","date":"2025-01-25T09:31:30.128510671"}, timestamp=null)
2025-01-25 10:31:31 🐄  2025-01-25 09:31:31 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"110.03","volume":"2589.00","date":"2025-01-25T09:31:31.131407118"}, timestamp=null)

Check the logs of the consumer, you should see the prices being read and displayed:

2025-01-25 10:31:12 🏦  2025-01-25 09:31:12 [main] INFO  c.g.jarnaud.kafka.consumer.Consumer - Market consumer starting
2025-01-25 10:31:13 🏦  2025-01-25 09:31:13 [main] INFO  c.g.jarnaud.kafka.consumer.Consumer - [offset 0] Received record null: {"ticker":"AMZN","last":"110.81","volume":"4077.00","date":"2025-01-25T09:31:11.913536082"}
2025-01-25 10:31:13 🏦  2025-01-25 09:31:13 [main] INFO  c.g.jarnaud.kafka.consumer.Consumer - [offset 1] Received record null: {"ticker":"AMZN","last":"165.10","volume":"6500.00","date":"2025-01-25T09:31:13.071582292"}
2025-01-25 10:31:14 🏦  2025-01-25 09:31:14 [main] INFO  c.g.jarnaud.kafka.consumer.Consumer - [offset 2] Received record null: {"ticker":"AMZN","last":"109.79","volume":"10435.00","date":"2025-01-25T09:31:14.076679208"}
...
2025-01-25 10:31:30 🏦  2025-01-25 09:31:30 [main] INFO  c.g.jarnaud.kafka.consumer.Consumer - [offset 18] Received record null: {"ticker":"AMZN","last":"132.49","volume":"10886.00","date":"2025-01-25T09:31:30.128510671"}
2025-01-25 10:31:31 🏦  2025-01-25 09:31:31 [main] INFO  c.g.jarnaud.kafka.consumer.Consumer - [offset 19] Received record null: {"ticker":"AMZN","last":"110.03","volume":"2589.00","date":"2025-01-25T09:31:31.131407118"}
2025-01-25 10:32:55 🏦  2025-01-25 09:32:55 [main] INFO  c.g.jarnaud.kafka.consumer.Consumer - Market consumer terminating

Kafka UI

To get an overview of your Kafka cluster in an easy way, you can use Kafka UI. It is easy to integrate into your application, just add the following service into docker-compose.yml:

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8099:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
      DYNAMIC_CONFIG_ENABLED: 'true'
    depends_on:
      - kafka0

After deploying the service, you should have access to the webapp running locally localhost:8099. You should see the topics being used to transfer messages (e.g. http://localhost:8099/ui/clusters/local/all-topics/prices-AMZN).

Conclusion

In this article we created a simple producer/consumer application based on Kafka topics.

The full source code of the project is available here: https://github.com/jarnaud/demo-kafka


comments powered by Disqus