Kafka - Java + Spring Boot + Docker + Kafka

2023. 8. 8. 09:40개발/MQ

728x90
반응형
✔준비물 : Linux, Java, IDE, Gradle, Docker, Kafka

Kafka 설치및 설정

설치및 설정 : https://haay.tistory.com/entry/Docker-Kafka-%EC%84%A4%EC%B9%98
 

Docker - Kafka 설치

✔준비물 : Linux, Docker, Kafka Kafka 설치및 설정 docker-compose.yml 작성 version: '2' # docker-compose version services: zookeeper: container_name: local-zookeeper image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: container_name:

haay.tistory.com

Spring

프로젝트 디렉토리 목록
1. build.gradle 
dependencies {
	implementation "org.springframework.boot:spring-boot-starter-jetty"
	implementation 'org.springframework.kafka:spring-kafka'
    
	developmentOnly 'org.springframework.boot:spring-boot-devtools'
	annotationProcessor 'org.projectlombok:lombok'
}

2. application.yml 
pring:
  config:
    activate:
      on-profile: local
  kafka:
    bootstrap-servers: kafkaIP:9092
    consumer:
      group-id: test-consumer-group # 디폴트 group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 100
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      poll-timeout: 5​

3. KafkaApplication
package kafka;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@Slf4j
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class KafkaApplication {
    private static final Logger logger = LoggerFactory.getLogger(KafkaApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }
}​

4. KafkaConsumerConf
package kafka.conf;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConf {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;


    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}​

5. ConsumerController
package kafka.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerController {
    private final Logger logger = LoggerFactory.getLogger(ConsumerController.class);

    @KafkaListener(topics = "stopic", groupId = "test-consumer-group", containerFactory = "kafkaListenerContainerFactory")
    public void listen(String message) throws Exception {
        logger.info(message);
    }
}​

 

실행결과
Kafka version: 3.0.0
Kafka commitId: 8cb0a5e9d3441962
Kafka startTimeMs: 1691455060642
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Subscribed to topic(s): stopic
Started KafkaApplication in 3.036 seconds (JVM running for 6.923)
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Cluster ID: kTvy-nXxTaqOf8b9qqO2kQ
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Discovered group coordinator 192.168.185.215:9092 (id: 2147483646 rack: null)
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Request joining group due to: need to re-join with the given member-id
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully joined group with generation Generation{generationId=18, memberId='consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc', protocol='range'}
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Finished assignment for group at generation 18: {consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc=Assignment(partitions=[stopic-0])}
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully synced group in generation Generation{generationId=18, memberId='consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc', protocol='range'}
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Notifying assignor about the new Assignment(partitions=[stopic-0])
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Adding newly assigned partitions: stopic-0
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Setting offset for partition stopic-0 to the committed offset FetchPosition{offset=31, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[192.168.185.215:9092 (id: 1 rack: null)], epoch=0}}
test-consumer-group: partitions assigned: [stopic-0]
Hello Kafka!​


728x90
반응형

'개발 > MQ' 카테고리의 다른 글

Kafka - Python + Docker + Kafka 실습  (0) 2023.08.08
Kafka - 이론, 실습(Ubuntu)  (0) 2023.07.31
RabbitMQ - 이론, 실습(Ubuntu)  (0) 2023.07.31