In this blog post, we are going to see the Apache Kafka Spring Boot example. It has 5 sections, as outlined below. You can skip to the section of your choice. or let’s see the working in the sequence.
Page Contents
Setup Apache Kafka
Apache Kakfa has two main components. Apache Zookeeper and Kafka Server. We will run these components in local for Kafka to run in local.
a. Download Apache Kafka from here.
b. Extract the tar file/zip file and place the extracted folder to some location let say /Users/userName/kafka_2.12-2.8.0.
c. Update the PATH system variable in your computer,
export PATH=/Users/userName/kafka_2.12-2.8.0/bin:$PATH
d. create folders like below:
mkdir /Users/userName/kafka_2.12-2.8.0/bin/data/zookeeper
mkdir /Users/userName/kafka_2.12-2.8.0/bin/data/kafka
e. Update the zookeper and kafka server config files to point to these folders.
file: config/zookeeper.properties
dataDir=/Users/userName/kafka_2.12-2.8.0/bin/data/zookeeper
file: config/server.properties
dataDir=/Users/userName/kafka_2.12-2.8.0/bin/data/kafka
f. Now start the zookeeper and kafka server.
zookeeper-server-start.sh /Users/userName/kafka_2.12-2.8.0/config/zookeeper.properties
Once zookeeper started, we should see the below message in the logs:
[2021-06-08 18:34:18,693] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
kafka-server-start.sh /Users/userName/kafka_2.12-2.8.0/config/server.properties
Setup SpringBoot
In this section, we will see how to setup the springboot for the developement.
We have a detailed blog post on the setup.
Code
Our project folder strcuture should look like below:
Lets see below the code files.
KafkaController.java
package com.atozlearner.springbootkafka4.controller;
import com.example.springbootkafka4.KafkaProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka-spring/")
public class KafkaController {
private final KafkaProducer kafkaProducer;
public KafkaController(KafkaProducer producer){ this.kafkaProducer = producer; }
@GetMapping(value = "/send")
public String writeMessageToTopic(@RequestParam("message") String message){
System.out.println("In a writeMessageToTopic");
this.kafkaProducer.writeMessage(message);
return "Message sent to Kafka TOPIC test_topic";
}
}
KafkaConsumer.java
package com.atozlearner.springbootkafka4;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test_topic", groupId = "test_group_id")
public void getMessage(String message){
System.out.println("Display the message");
System.out.println(message);
}
}
KafkaProducer.java
package com.atozlearner.springbootkafka4;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "test_topic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public String writeMessage(String message){
this.kafkaTemplate.send(TOPIC,message);
return "Message sent to kafka Topic test_topic Successfully";
}
}
Springbootkafka4Application.java
package com.atozlearner.springbootkafka4;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Springbootkafka4Application {
public static void main(String[] args) {
SpringApplication.run(Springbootkafka4Application.class, args);
}
}
application.yaml
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: "test_group_id"
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Demonstration of Kafka spring boot example
Hope this article was helpful. If you liked it, please hit like and comment.