package kafka.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerController {
private final Logger logger = LoggerFactory.getLogger(ConsumerController.class);
@KafkaListener(topics = "stopic", groupId = "test-consumer-group", containerFactory = "kafkaListenerContainerFactory")
public void listen(String message) throws Exception {
logger.info(message);
}
}
실행결과
Kafka version: 3.0.0
Kafka commitId: 8cb0a5e9d3441962
Kafka startTimeMs: 1691455060642
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Subscribed to topic(s): stopic
Started KafkaApplication in 3.036 seconds (JVM running for 6.923)
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Cluster ID: kTvy-nXxTaqOf8b9qqO2kQ
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Discovered group coordinator 192.168.185.215:9092 (id: 2147483646 rack: null)
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Request joining group due to: need to re-join with the given member-id
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully joined group with generation Generation{generationId=18, memberId='consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc', protocol='range'}
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Finished assignment for group at generation 18: {consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc=Assignment(partitions=[stopic-0])}
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully synced group in generation Generation{generationId=18, memberId='consumer-test-consumer-group-1-6ab83c99-8a64-45d6-8130-d0ad126256fc', protocol='range'}
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Notifying assignor about the new Assignment(partitions=[stopic-0])
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Adding newly assigned partitions: stopic-0
[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Setting offset for partition stopic-0 to the committed offset FetchPosition{offset=31, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[192.168.185.215:9092 (id: 1 rack: null)], epoch=0}}
test-consumer-group: partitions assigned: [stopic-0]
Hello Kafka!