Kafka - Java + Spring Boot + Docker + Kafka
2023. 8. 8. 09:40ㆍ개발/MQ
728x90
반응형
✔준비물 : Linux, Java, IDE, Gradle, Docker, Kafka
Kafka 설치및 설정
설치및 설정 : https://haay.tistory.com/entry/Docker-Kafka-%EC%84%A4%EC%B9%98
Spring
프로젝트 디렉토리 목록
1. build.gradle
dependencies { implementation "org.springframework.boot:spring-boot-starter-jetty" implementation 'org.springframework.kafka:spring-kafka' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.projectlombok:lombok' }
2. application.yml
pring: config: activate: on-profile: local kafka: bootstrap-servers: kafkaIP:9092 consumer: group-id: test-consumer-group # 디폴트 group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer listener: poll-timeout: 5
3. KafkaApplication
package kafka; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @Slf4j @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class KafkaApplication { private static final Logger logger = LoggerFactory.getLogger(KafkaApplication.class); public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
4. KafkaConsumerConf
package kafka.conf; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConf { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Value("${spring.kafka.consumer.key-deserializer}") private String keyDeserializer; @Value("${spring.kafka.consumer.value-deserializer}") private String valueDeserializer; public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
5. ConsumerController
package kafka.controller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class ConsumerController { private final Logger logger = LoggerFactory.getLogger(ConsumerController.class); @KafkaListener(topics = "stopic", groupId = "test-consumer-group", containerFactory = "kafkaListenerContainerFactory") public void listen(String message) throws Exception { logger.info(message); } }
실행결과
Kafka version: 3.0.0 Kafka commitId: 8cb0a5e9d3441962 Kafka startTimeMs: 1691455060642 [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Subscribed to topic(s): stopic Started KafkaApplication in 3.036 seconds (JVM running for 6.923) [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Cluster ID: kTvy-nXxTaqOf8b9qqO2kQ [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Discovered group coordinator 192.168.185.215:9092 (id: 2147483646 rack: null) [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Request joining group due to: need to re-join with the given member-id [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully joined group with generation Generation{generationId=18, memberId='consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc', protocol='range'} [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Finished assignment for group at generation 18: {consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc=Assignment(partitions=[stopic-0])} [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully synced group in generation Generation{generationId=18, memberId='consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc', protocol='range'} [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Notifying assignor about the new Assignment(partitions=[stopic-0]) [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Adding newly assigned partitions: stopic-0 [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Setting offset for partition stopic-0 to the committed offset FetchPosition{offset=31, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[192.168.185.215:9092 (id: 1 rack: null)], epoch=0}} test-consumer-group: partitions assigned: [stopic-0] Hello Kafka!
728x90
반응형
'개발 > MQ' 카테고리의 다른 글
Kafka - Python + Docker + Kafka 실습 (0) | 2023.08.08 |
---|---|
Kafka - 이론, 실습(Ubuntu) (0) | 2023.07.31 |
RabbitMQ - 이론, 실습(Ubuntu) (0) | 2023.07.31 |