Kafka - ์ด๋ก , ์‹ค์Šต(Ubuntu)

2023. 7. 31. 12:12ใ†๊ฐœ๋ฐœ/MQ

728x90
๋ฐ˜์‘ํ˜•
๐Ÿ’ก ์ค€๋น„๋ฌผ : ubuntu 20.04.3, Kafka

์„ค๋ช…

Kafka๋ž€ ?

๋Œ€์šฉ๋Ÿ‰, ๋Œ€๊ทœ๋ชจ ๋ฉ”์‹œ์ง€ ๋ฐ์ดํ„ฐ๋ฅผ ๋น ๋ฅด๊ฒŒ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๊ฐœ๋ฐœ๋œ ๋ถ„์‚ฐ ๋ฉ”์‹œ์ง• ํ”Œ๋žซํผ.

ํŠน์ง•

  • pub/sub ๋ฉ”์‹œ์ง€ ์‹œ์Šคํ…œ : ์ƒ์‚ฐ์ž๊ฐ€ ์›ํ•˜๋Š” ๊ฐ ๋ฉ”์‹œ์ง€๋ฅผ ๊ฒŒ์‹œํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์‹œ์ง€ ๋ฐฐํฌ ํŒจํ„ด
  • ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ํ”ผ๋“œ ๊ด€๋ฆฌ์šฉ๋„ ํ”Œ๋žซํผ
    • ๋†’์€ ์ฒ˜๋ฆฌ๋Ÿ‰
    • ๋‚ฎ์€ ์ง€์—ฐ์‹œ๊ฐ„
  • ๋ถ„์‚ฐ ํŠธ๋žœ์žญ์…˜ ๋กœ๊ทธ๋กœ ๊ตฌ์„ฑ๋œ, ํ™•์žฅ๊ฐ€๋Šฅํ•œ pub/sub ๋ฉ”์‹œ์ง€ ํ
  • ์ฃผํ‚คํผ์™€ ๊ฐ™์ด ๋™์ž‘
    • ๊ตฌ์„ฑ ์ •๋ณด๋ฅผ ์œ ์ง€ ๊ด€๋ฆฌํ•˜๊ณ , ์ด๋ฆ„์„ ์ง€์ •ํ•˜๊ณ , ๋ถ„์‚ฐ ๋™๊ธฐํ™”๋ฅผ ์ œ๊ณตํ•˜๊ณ , ๊ทธ๋ฃน ์„œ๋น„์Šค๋ฅผ ์ œ๊ณตํ•˜๊ธฐ ์œ„ํ•œ ์ค‘์•™ ์ง‘์ค‘์‹ ์„œ๋น„์Šค
    • ๋ถ„์‚ฐํ˜• ์ฝ”๋””๋„ค์ด์…˜ ์„œ๋น„์Šค
    • ์ถœ์ฒ˜
  • ์‚ฌ์šฉ ์šฉ๋„
    • ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
  • ๊ธฐ๋Šฅ
    • ๋ถ„์‚ฐ ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ
    • ๋†’์€ ์ฒ˜๋ฆฌ๋Ÿ‰
    • ์‹ค์‹œ๊ฐ„ ์†”๋ฃจ์…˜
    • ์˜๊ตฌ ๋ฉ”์‹œ์ง•
  • ํŠน์ง•
    • ๋ฉ”์‹œ์ง€ ๋Œ€๊ธฐ์—ด์— ์˜์กดํ•˜์ง€ ์•Š๊ณ  ๋ฉ”์‹œ์ง€๋ฅผ ๋กœ๊ทธ์— ์ถ”๊ฐ€ํ•˜๊ณ  ๊ทธ๋Œ€๋กœ ๋‘๋ฉฐ, ์†Œ๋น„์ž๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ๊ฑฐ๋‚˜ ๋ณด์กด ์ œํ•œ์— ๋„๋‹ฌํ•  ๋•Œ๊นŒ์ง€ ๊ทธ๋Œ€๋กœ ์œ ์ง€.
      • disk ๋ฉ”์‹œ์ง€ ์ €์žฅ - offset
    • ํ’€ ๊ธฐ๋ฐ˜ ์ ‘๊ทผ ๋ฐฉ์‹
      • ๋ฉ”์‹œ์ง€ ์ผ๊ด„์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ

์„ค์น˜

# ์„ค์น˜๋ฐ ์••์ถ• ํ•ด์ œ
wget <https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz>
tar -xzf kafka_2.12-3.3.1.tgz

์‹คํ–‰

# zookeeper / kafka ์‹คํ–‰
cd kafka_2.12-3.3.1/bin
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties

topic ์ƒ์„ฑ

# ํ† ํ”ฝ ์ƒ์„ฑ
# ํ† ํ”ฝ์ด๋ฆ„ : test
# replication-factor : ๋ณต์ œ ๋Œ€์ƒ ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜ 1๊ฐœ (์—†์Œ)
# partitions : ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜ 3๊ฐœ
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test

# ๋กœ๊ทธํ™•์ธ ๊ฐ€๋Šฅ
cat ../logs/controller.log

# ํ† ํ”ฝ ์ƒ์„ฑ ํ™•์ธ
./kafka-topics.sh --describe --bootstrap-server localhost:9092

๋ฉ”์‹œ์ง€ ์ž‘์„ฑ

# ๋ฉ”์‹œ์ง€ ์ž‘์„ฑ
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
> test
> kafka
> start
> end

# ๋ฉ”์‹œ์ง€ ๊ตฌ๋… 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
test
kafka
start
end

์ข…๋ฃŒ

# KAFKA ํ™˜๊ฒฝ ์ข…๋ฃŒ
# ์นดํ”„์นด๋ฅผ ์ข…๋ฃŒํ•˜๋ ค๋ฉด ์ฝ˜์†”์—์„œ Ctrl-C ๋กœ ์•„๋ž˜ ์ˆœ์„œ๋Œ€๋กœ ์ข…๋ฃŒํ•ด ์ฃผ์‹œ๋ฉด๋ฉ๋‹ˆ๋‹ค.

# ์ƒ์‚ฐ์ž ๋ฐ ์†Œ๋น„์ž ํด๋ผ์ด์–ธํŠธ ์ค‘์ง€
# Kafka ๋ธŒ๋กœ์ปค๋ฅผ ์ค‘์ง€
# ZooKeeper ์„œ๋ฒ„ ์ค‘์ง€

Kafka ๊ฐ„๋‹จ ์‹ค์Šต

Consumer Group

  • ๊ทธ๋ฃน์— ์†ํ•˜๋Š” ์ปจ์Šˆ๋จธ๊ฐ€ ์—ฌ๋Ÿฌ๊ฐœ๋ฉด ๋กœ๋“œ๋ฐธ๋Ÿฐ์‹ฑ์„ ํ†ตํ•ด ์ž๋™์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฐฐ.
# ๊ทธ๋ฃน ์ƒ์„ฑ
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test1

# ๊ทธ๋ฃน ํ™•์ธ
./kafka-console-consumer.sh --bootstrap-server localhost:9092 -group testgroup1  --from-beginning --topic test1
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
testgroup1       test1           0          0               0               0               console-consumer-eb58c4ca-fb34-45c7-867f-2ef0ad8951dd /127.0.0.1      console-consumer
testgroup1       test1           1          0               0               0               console-consumer-eb58c4ca-fb34-45c7-867f-2ef0ad8951dd /127.0.0.1      console-consumer
testgroup1       test1           2          0               0               0               console-consumer-eb58c4ca-fb34-45c7-867f-2ef0ad8951dd /127.0.0.1      console-consumer
  • ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน ํ…Œ์ŠคํŠธ
# ๋ฉ”์‹œ์ง€ ์ž…๋ ฅ
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9

# ๊ทธ๋ฃน ํ™•์ธ
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testgroup1

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
testgroup1      test1           0          3               3               0               console-consumer-672ffa72-f17e-43b0-b4be-c9304803391b /127.0.0.1      console-consumer
testgroup1      test1           1          3               3               0               console-consumer-672ffa72-f17e-43b0-b4be-c9304803391b /127.0.0.1      console-consumer
testgroup1      test1           2          3               3               0               console-consumer-672ffa72-f17e-43b0-b4be-c9304803391b /127.0.0.1      console-consumer

Python ๊ฐ„๋‹จ ํ…Œ์ŠคํŠธ

# kafka ํŒจํ‚ค์ง€ ์„ค์น˜
pip install kafka-python

Producer ์˜ต์…˜

Python-Producer ์ž‘์„ฑ

# producer.py ์ž‘์„ฑ
from kafka import KafkaProducer
from json import dumps
import time

# producer ๊ฐ์ฒด ์ƒ์„ฑ
producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'],
                                 value_serializer=lambda x: dumps(x).encode('utf-8'))

start = time.time()
for i in range(10000):
    data = {'str' : 'result'+str(i)}
    producer.send('test1', value=data)
    producer.flush()
    print("elapsed :", time.time() - start)

# ์‹คํ–‰
python producer.py
..
elapsed : 57.16879749298096
elapsed : 57.171818017959595
elapsed : 57.175421476364136

Consumer ์˜ต์…˜

Python-Consumer ์ž‘์„ฑ

# consumer.py ์ž‘์„ฑ
from kafka import KafkaConsumer
from json import loads
import time

# topic, broker list
consumer = KafkaConsumer(
        'test1',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='testgroup1',
        value_deserializer=lambda x: loads(x.decode('utf-8')),
        consumer_timeout_ms=1000
    )

# consumer list๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค
print('[begin] get consumer list')
while True:
    for message in consumer:
        print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
                    message.topic, message.partition, message.offset, message.key, message.value
                        ))
        #time.sleep(1)

    print('[end] get consumer list')

# ์ถœ๋ ฅ
..
Topic: test, Partition: 0, Offset: 96, Key: None, Value: {'str': 'result33'}
Topic: test, Partition: 0, Offset: 97, Key: None, Value: {'str': 'result34'}
Topic: test, Partition: 0, Offset: 98, Key: None, Value: {'str': 'result35'}
Topic: test, Partition: 0, Offset: 99, Key: None, Value: {'str': 'result36'}

Kafka Docker + UI

- https://devocean.sk.com/blog/techBoardDetail.do?ID=163980

 

Kafka-UI Tool ์„ ์ด์šฉํ•˜์—ฌ Kafka ๊ด€๋ฆฌํ•˜๊ธฐ

 

devocean.sk.com

์ถœ์ฒ˜ : https://needjarvis.tistory.com/607,
https://github.com/dpkp/kafka-python
 

GitHub - dpkp/kafka-python: Python client for Apache Kafka

Python client for Apache Kafka. Contribute to dpkp/kafka-python development by creating an account on GitHub.

github.com

 

[์นดํ”„์นด] Python์œผ๋กœ Kafka์— ์ „์†ก(Producer)ํ•˜๊ณ  ๊ฐ€์ ธ์˜ค๊ธฐ(consumer)

์นดํ”„์นด(Kafka)์—์„œ๋Š” ๋‹ค์–‘ํ•œ ์–ธ์–ด๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•˜๋Š”๋ฐ ๋ณธ ํฌ์ŠคํŒ…์€ ํŒŒ์ด์ฌ(Python)์œผ๋กœ ๊ตฌํ˜„ํ•˜๋Š” ํ”„๋กœ๋“€์„œ(producer)/์ปจ์Šˆ๋จธ(consumer) ์ฆ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ด๊ณ  ๋ฐ›๋Š” ๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•œ๋‹ค.

needjarvis.tistory.com

 

728x90
๋ฐ˜์‘ํ˜•

'๊ฐœ๋ฐœ > MQ' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

Kafka - Java + Spring Boot + Docker + Kafka  (0) 2023.08.08
Kafka - Python + Docker + Kafka ์‹ค์Šต  (0) 2023.08.08
RabbitMQ - ์ด๋ก , ์‹ค์Šต(Ubuntu)  (0) 2023.07.31