Kafka - Python + Docker + Kafka 실습

2023. 8. 8. 09:09개발/MQ

728x90
반응형
✔준비물 : Linux, Python, Docker, Kafka

Kafka 설치및 설정

설치및 설정 : https://haay.tistory.com/entry/Docker-Kafka-%EC%84%A4%EC%B9%98
 

Docker - Kafka 설치

✔준비물 : Linux, Docker, Kafka Kafka 설치및 설정 docker-compose.yml 작성 version: '2' # docker-compose version services: zookeeper: container_name: local-zookeeper image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: container_name:

haay.tistory.com

Python

Producer.py
from confluent_kafka import Producer

def produce_message(producer, topic, message):
    producer.produce(topic, message.encode('utf-8'))
    producer.flush()

def main():
    # kafka IP
    kafka_broker = 'kafkaIP:9092'
    topic = 'stopic'

    # Kafka producer 설정
    producer_config = {
        'bootstrap.servers': kafka_broker,
    }

    # Kafka producer 생성
    producer = Producer(producer_config)

    # 보낼 메시지
    message = "Hello Kafka!"
    print(f"topic : {topic}, meg : {message}")
    produce_message(producer, topic, message)

if __name__ == "__main__":
    main()

Consumer.py

from confluent_kafka import Consumer

# consumer 객체 생성
consumer = Consumer({
    'bootstrap.servers':'kafkaIP:9092',
    'group.id':'test-consumer-group',
    'auto.offset.reset':'earliest'
})
consumer.subscribe(['stopic'])

while True:
    # 단일 메시지를 소비한다.
    # 매개 변수로는 timeout 설정 (float)
    message = consumer.poll(1.0)
    if message:
        print('Received message: {}'.format(message.value().decode('utf-8')))
    else:
        continue

consumer.close()

실행 결과

$ python Producer.py
topic : stopic, meg : Hello Kafka!

$ python Consumer.py
Received message: Hello Kafka!​

 

728x90
반응형

'개발 > MQ' 카테고리의 다른 글

Kafka - Java + Spring Boot + Docker + Kafka  (0) 2023.08.08
Kafka - 이론, 실습(Ubuntu)  (0) 2023.07.31
RabbitMQ - 이론, 실습(Ubuntu)  (0) 2023.07.31