Simple Kafka application
This page shows how to build a simple application with Apache Kafka. Our application will be a simple producer/consumer, which is a common use case for Kafka.
Architecture
We will simulate a market price feed system, where a producer generate some prices to be used by a consumer. The producer will generate random prices and push them to a Kafka topic. The consumer will poll these messages from the topic. Both will be Spring Boot applications and deployed as Docker containers.
Services
Docker is convenient to deploy the different services of our app, here is the configuration in docker-compose.yml
:
services:
kafka0:
image: apache/kafka:latest
container_name: kafka0
environment:
KAFKA_ENABLE_KRAFT: yes
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9090
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka0:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ports:
- "9090:9092"
consumer:
image: consumer:1.0.0-SNAPSHOT
container_name: consumer
build:
context: ./consumer
dockerfile: Dockerfile
ports:
- "8081:8080"
environment:
KAFKA_BOOTSTRAP: "kafka0:29092"
TICKER: "AMZN"
depends_on:
- kafka0
producer:
image: producer:1.0.0-SNAPSHOT
container_name: producer
build:
context: ./producer
dockerfile: Dockerfile
ports:
- "8082:8080"
environment:
KAFKA_BOOTSTRAP: "kafka0:29092"
TICKER: "AMZN"
depends_on:
- kafka0
Kafka
First, Kafka can be deployed using the apache/kafka:latest
Docker image.
We allow access to the Kafka broker from outside Docker on localhost:9090
but also inside the Docker network (for the other services) on kafka0:29092
.
Spring applications
Dependency
Kafka provides a Java client to perform all operations on topics:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>
Producer
Consumer and Producer are very similar and basic Spring Boot applications. We pass as parameter the Kafka broker bootstrap addresses and the information required to find the relevant topic.
The application is declared as follows:
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args).close();
}
}
@Component
public class ProducerRunner implements CommandLineRunner {
@Value("${KAFKA_BOOTSTRAP:localhost:9090}")
private String kafkaBootstrap;
@Value("${TICKER}")
private String ticker;
@Override
public void run(String... args) {
Producer producer = new Producer(kafkaBootstrap, ticker);
producer.sendMessages(20);
}
}
@Slf4j
public class Producer {
private static final String TOPIC_PREFIX = "prices-";
private final String kafkaBootstrap;
private final String ticker;
private final String topicName;
/**
* @param kafkaBootstrap the list of comma-separated addresses of Kafka bootstrap servers (from brokers list).
* @param ticker the product ticker.
*/
public Producer(String kafkaBootstrap, String ticker) {
if (ticker == null || ticker.isBlank()) {
throw new RuntimeException("No ticker specified for price producer");
}
this.kafkaBootstrap = kafkaBootstrap;
this.ticker = ticker.trim().toUpperCase();
this.topicName = TOPIC_PREFIX + this.ticker;
createTopic();
}
public void sendMessages(int nbMessages) {
Properties properties = loadProperties();
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Random random = new Random();
try (producer) {
for (int i = 0; i < nbMessages; i++) {
sendMessage(producer, random);
Thread.sleep(1000);
}
} catch (JsonProcessingException | InterruptedException e) {
log.error(e.getMessage(), e);
}
}
private void sendMessage(KafkaProducer<String, String> producer, Random random) throws JsonProcessingException {
// Random price generation.
double price = 100 + random.nextDouble(100);
double volume = 1000 + random.nextInt(10000);
ObjectMapper mapper = new ObjectMapper();
ObjectNode node = mapper.createObjectNode();
node.put("ticker", ticker);
node.put("last", formatPrice(price));
node.put("volume", formatPrice(volume));
node.put("date", LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
String msg = mapper.writer().writeValueAsString(node);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, msg);
log.debug("Created record: {}", record);
producer.send(record);
producer.flush();
}
private String formatPrice(double price) {
return String.format("%.2f", price);
}
private Properties loadProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap); // Kafka brokers addresses.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 5000);
return props;
}
}
In this basic implementation the producer will produce 20 messages (one per second), then terminate. Feel free to adapt this code to your needs.
Consumer
The consumer app is similar to the producer app, however the loadProperties()
method should return a consumer specific configuration:
private Properties loadProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap); // Kafka brokers addresses.
props.put(ConsumerConfig.GROUP_ID_CONFIG, "market-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //choose from earliest/latest/none
return props;
}
and instead of producing messages, we want to receive them:
public void receiveMessages() {
Properties properties = loadProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
try (consumer) {
consumer.subscribe(List.of(topicName));
int iter = 120;
while (iter-- > 0) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("[offset {}] Received record {}: {}", record.offset(), record.key(), record.value());
}
}
}
}
This is a basic implementation will some active polling for around 2 minutes, then the consumer will terminate. Again feel free to adapt this code to your needs.
Launch the application
Start the application (docker compose up
), you should see the Kafka image being downloaded, as well as
the consumer and producer applications being built.
Check the logs of the producer, you should see the prices being sent:
2025-01-25 10:31:13 🐄 2025-01-25 09:31:13 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"165.10","volume":"6500.00","date":"2025-01-25T09:31:13.071582292"}, timestamp=null)
2025-01-25 10:31:14 🐄 2025-01-25 09:31:14 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"109.79","volume":"10435.00","date":"2025-01-25T09:31:14.076679208"}, timestamp=null)
...
2025-01-25 10:31:30 🐄 2025-01-25 09:31:30 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"132.49","volume":"10886.00","date":"2025-01-25T09:31:30.128510671"}, timestamp=null)
2025-01-25 10:31:31 🐄 2025-01-25 09:31:31 [main] DEBUG c.g.jarnaud.kafka.producer.Producer - Created record: ProducerRecord(topic=prices-AMZN, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value={"ticker":"AMZN","last":"110.03","volume":"2589.00","date":"2025-01-25T09:31:31.131407118"}, timestamp=null)
Check the logs of the consumer, you should see the prices being read and displayed:
2025-01-25 10:31:12 🏦 2025-01-25 09:31:12 [main] INFO c.g.jarnaud.kafka.consumer.Consumer - Market consumer starting
2025-01-25 10:31:13 🏦 2025-01-25 09:31:13 [main] INFO c.g.jarnaud.kafka.consumer.Consumer - [offset 0] Received record null: {"ticker":"AMZN","last":"110.81","volume":"4077.00","date":"2025-01-25T09:31:11.913536082"}
2025-01-25 10:31:13 🏦 2025-01-25 09:31:13 [main] INFO c.g.jarnaud.kafka.consumer.Consumer - [offset 1] Received record null: {"ticker":"AMZN","last":"165.10","volume":"6500.00","date":"2025-01-25T09:31:13.071582292"}
2025-01-25 10:31:14 🏦 2025-01-25 09:31:14 [main] INFO c.g.jarnaud.kafka.consumer.Consumer - [offset 2] Received record null: {"ticker":"AMZN","last":"109.79","volume":"10435.00","date":"2025-01-25T09:31:14.076679208"}
...
2025-01-25 10:31:30 🏦 2025-01-25 09:31:30 [main] INFO c.g.jarnaud.kafka.consumer.Consumer - [offset 18] Received record null: {"ticker":"AMZN","last":"132.49","volume":"10886.00","date":"2025-01-25T09:31:30.128510671"}
2025-01-25 10:31:31 🏦 2025-01-25 09:31:31 [main] INFO c.g.jarnaud.kafka.consumer.Consumer - [offset 19] Received record null: {"ticker":"AMZN","last":"110.03","volume":"2589.00","date":"2025-01-25T09:31:31.131407118"}
2025-01-25 10:32:55 🏦 2025-01-25 09:32:55 [main] INFO c.g.jarnaud.kafka.consumer.Consumer - Market consumer terminating
Kafka UI
To get an overview of your Kafka cluster in an easy way, you can use Kafka UI.
It is easy to integrate into your application, just add the following service into docker-compose.yml
:
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- "8099:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
DYNAMIC_CONFIG_ENABLED: 'true'
depends_on:
- kafka0
After deploying the service, you should have access to the webapp running locally localhost:8099
.
You should see the topics being used to transfer messages (e.g. http://localhost:8099/ui/clusters/local/all-topics/prices-AMZN).
Conclusion
In this article we created a simple producer/consumer application based on Kafka topics.
The full source code of the project is available here: https://github.com/jarnaud/demo-kafka
comments powered by Disqus