Kafka - ์ด๋ก , ์ค์ต(Ubuntu)
2023. 7. 31. 12:12ใ๊ฐ๋ฐ/MQ
728x90
๋ฐ์ํ
๐ก ์ค๋น๋ฌผ : ubuntu 20.04.3, Kafka
์ค๋ช
Kafka๋ ?
๋์ฉ๋, ๋๊ท๋ชจ ๋ฉ์์ง ๋ฐ์ดํฐ๋ฅผ ๋น ๋ฅด๊ฒ ์ฒ๋ฆฌํ๋๋ก ๊ฐ๋ฐ๋ ๋ถ์ฐ ๋ฉ์์ง ํ๋ซํผ.
ํน์ง
- pub/sub ๋ฉ์์ง ์์คํ : ์์ฐ์๊ฐ ์ํ๋ ๊ฐ ๋ฉ์์ง๋ฅผ ๊ฒ์ํ ์ ์๋ ๋ฉ์์ง ๋ฐฐํฌ ํจํด
- ์ค์๊ฐ ๋ฐ์ดํฐ ํผ๋ ๊ด๋ฆฌ์ฉ๋ ํ๋ซํผ
- ๋์ ์ฒ๋ฆฌ๋
- ๋ฎ์ ์ง์ฐ์๊ฐ
- ๋ถ์ฐ ํธ๋์ญ์ ๋ก๊ทธ๋ก ๊ตฌ์ฑ๋, ํ์ฅ๊ฐ๋ฅํ pub/sub ๋ฉ์์ง ํ
- ์ฃผํคํผ์ ๊ฐ์ด ๋์
- ๊ตฌ์ฑ ์ ๋ณด๋ฅผ ์ ์ง ๊ด๋ฆฌํ๊ณ , ์ด๋ฆ์ ์ง์ ํ๊ณ , ๋ถ์ฐ ๋๊ธฐํ๋ฅผ ์ ๊ณตํ๊ณ , ๊ทธ๋ฃน ์๋น์ค๋ฅผ ์ ๊ณตํ๊ธฐ ์ํ ์ค์ ์ง์ค์ ์๋น์ค
- ๋ถ์ฐํ ์ฝ๋๋ค์ด์ ์๋น์ค
- ์ถ์ฒ
- ์ฌ์ฉ ์ฉ๋
- ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
- ๊ธฐ๋ฅ
- ๋ถ์ฐ ์ด๋ฒคํธ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ
- ๋์ ์ฒ๋ฆฌ๋
- ์ค์๊ฐ ์๋ฃจ์
- ์๊ตฌ ๋ฉ์์ง
- ํน์ง
- ๋ฉ์์ง ๋๊ธฐ์ด์ ์์กดํ์ง ์๊ณ ๋ฉ์์ง๋ฅผ ๋ก๊ทธ์ ์ถ๊ฐํ๊ณ ๊ทธ๋๋ก ๋๋ฉฐ, ์๋น์๊ฐ ๋ฉ์์ง๋ฅผ ์ฝ๊ฑฐ๋ ๋ณด์กด ์ ํ์ ๋๋ฌํ ๋๊น์ง ๊ทธ๋๋ก ์ ์ง.
- disk ๋ฉ์์ง ์ ์ฅ - offset
- ํ ๊ธฐ๋ฐ ์ ๊ทผ ๋ฐฉ์
- ๋ฉ์์ง ์ผ๊ด์ฒ๋ฆฌ ๊ฐ๋ฅ
- ๋ฉ์์ง ๋๊ธฐ์ด์ ์์กดํ์ง ์๊ณ ๋ฉ์์ง๋ฅผ ๋ก๊ทธ์ ์ถ๊ฐํ๊ณ ๊ทธ๋๋ก ๋๋ฉฐ, ์๋น์๊ฐ ๋ฉ์์ง๋ฅผ ์ฝ๊ฑฐ๋ ๋ณด์กด ์ ํ์ ๋๋ฌํ ๋๊น์ง ๊ทธ๋๋ก ์ ์ง.
์ค์น
# ์ค์น๋ฐ ์์ถ ํด์
wget <https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz>
tar -xzf kafka_2.12-3.3.1.tgz
์คํ
# zookeeper / kafka ์คํ
cd kafka_2.12-3.3.1/bin
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
topic ์์ฑ
# ํ ํฝ ์์ฑ
# ํ ํฝ์ด๋ฆ : test
# replication-factor : ๋ณต์ ๋์ ํํฐ์
๊ฐ์ 1๊ฐ (์์)
# partitions : ํํฐ์
๊ฐ์ 3๊ฐ
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test
# ๋ก๊ทธํ์ธ ๊ฐ๋ฅ
cat ../logs/controller.log
# ํ ํฝ ์์ฑ ํ์ธ
./kafka-topics.sh --describe --bootstrap-server localhost:9092
๋ฉ์์ง ์์ฑ
# ๋ฉ์์ง ์์ฑ
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
> test
> kafka
> start
> end
# ๋ฉ์์ง ๊ตฌ๋
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
test
kafka
start
end
์ข ๋ฃ
# KAFKA ํ๊ฒฝ ์ข
๋ฃ
# ์นดํ์นด๋ฅผ ์ข
๋ฃํ๋ ค๋ฉด ์ฝ์์์ Ctrl-C ๋ก ์๋ ์์๋๋ก ์ข
๋ฃํด ์ฃผ์๋ฉด๋ฉ๋๋ค.
# ์์ฐ์ ๋ฐ ์๋น์ ํด๋ผ์ด์ธํธ ์ค์ง
# Kafka ๋ธ๋ก์ปค๋ฅผ ์ค์ง
# ZooKeeper ์๋ฒ ์ค์ง
Kafka ๊ฐ๋จ ์ค์ต
Consumer Group
- ๊ทธ๋ฃน์ ์ํ๋ ์ปจ์๋จธ๊ฐ ์ฌ๋ฌ๊ฐ๋ฉด ๋ก๋๋ฐธ๋ฐ์ฑ์ ํตํด ์๋์ผ๋ก ๋ฉ์์ง๋ฅผ ๋ถ๋ฐฐ.
# ๊ทธ๋ฃน ์์ฑ
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test1
# ๊ทธ๋ฃน ํ์ธ
./kafka-console-consumer.sh --bootstrap-server localhost:9092 -group testgroup1 --from-beginning --topic test1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup1 test1 0 0 0 0 console-consumer-eb58c4ca-fb34-45c7-867f-2ef0ad8951dd /127.0.0.1 console-consumer
testgroup1 test1 1 0 0 0 console-consumer-eb58c4ca-fb34-45c7-867f-2ef0ad8951dd /127.0.0.1 console-consumer
testgroup1 test1 2 0 0 0 console-consumer-eb58c4ca-fb34-45c7-867f-2ef0ad8951dd /127.0.0.1 console-consumer
- ์ปจ์๋จธ ๊ทธ๋ฃน ํ ์คํธ
# ๋ฉ์์ง ์
๋ ฅ
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
# ๊ทธ๋ฃน ํ์ธ
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testgroup1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup1 test1 0 3 3 0 console-consumer-672ffa72-f17e-43b0-b4be-c9304803391b /127.0.0.1 console-consumer
testgroup1 test1 1 3 3 0 console-consumer-672ffa72-f17e-43b0-b4be-c9304803391b /127.0.0.1 console-consumer
testgroup1 test1 2 3 3 0 console-consumer-672ffa72-f17e-43b0-b4be-c9304803391b /127.0.0.1 console-consumer
Python ๊ฐ๋จ ํ ์คํธ
# kafka ํจํค์ง ์ค์น
pip install kafka-python
Producer ์ต์
Python-Producer ์์ฑ
# producer.py ์์ฑ
from kafka import KafkaProducer
from json import dumps
import time
# producer ๊ฐ์ฒด ์์ฑ
producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
start = time.time()
for i in range(10000):
data = {'str' : 'result'+str(i)}
producer.send('test1', value=data)
producer.flush()
print("elapsed :", time.time() - start)
# ์คํ
python producer.py
..
elapsed : 57.16879749298096
elapsed : 57.171818017959595
elapsed : 57.175421476364136
Consumer ์ต์
Python-Consumer ์์ฑ
# consumer.py ์์ฑ
from kafka import KafkaConsumer
from json import loads
import time
# topic, broker list
consumer = KafkaConsumer(
'test1',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='testgroup1',
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=1000
)
# consumer list๋ฅผ ๊ฐ์ ธ์จ๋ค
print('[begin] get consumer list')
while True:
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
message.topic, message.partition, message.offset, message.key, message.value
))
#time.sleep(1)
print('[end] get consumer list')
# ์ถ๋ ฅ
..
Topic: test, Partition: 0, Offset: 96, Key: None, Value: {'str': 'result33'}
Topic: test, Partition: 0, Offset: 97, Key: None, Value: {'str': 'result34'}
Topic: test, Partition: 0, Offset: 98, Key: None, Value: {'str': 'result35'}
Topic: test, Partition: 0, Offset: 99, Key: None, Value: {'str': 'result36'}
Kafka Docker + UI
- https://devocean.sk.com/blog/techBoardDetail.do?ID=163980
์ถ์ฒ : https://needjarvis.tistory.com/607,
https://github.com/dpkp/kafka-python
728x90
๋ฐ์ํ
'๊ฐ๋ฐ > MQ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Kafka - Java + Spring Boot + Docker + Kafka (0) | 2023.08.08 |
---|---|
Kafka - Python + Docker + Kafka ์ค์ต (0) | 2023.08.08 |
RabbitMQ - ์ด๋ก , ์ค์ต(Ubuntu) (0) | 2023.07.31 |