Hadoop - MapReduce 이론, 실습

2023. 8. 2. 09:13개발/Big data

728x90
반응형

MapReduce

MapReduce 이란 ?

  • 대용량의 데이터를 처리하기 위한 분산 프로그래밍 모델.
  • 여러 노드에 태스크를 분배하는 방법으로 진행되며 MapReduce 태스크는 맵(Map)과 리듀스(Reduce) 총 두단계로 구성.
  • 대규모 분산 컴퓨팅 환경에서 대량의 데이터를 병렬로 분석이 가능.

흩어져 있는 데이터를 수직화 → 데이터를 각각의 종류별로 수집 (Map) → 필터링과 정렬을 거쳐 데이터 추출(Reduce)하는 분산처리 기술과 관련된 Framework

큰 파일을 블럭단위로 나누고 모든 블럭은 같은 Map 작업을 수행하여 이후 Reduce 작업을 수행 (블럭 = 64MB**)**

  • fork/join : 효율적인 병렬처리를 위해 공통된 모델이며 분할 정복 알고리즘을 통해 재귀적으로 처리 용어 (참고)

구성

  1. Input : 문자 입력
  2. Spiltting : 문자열 데이터를 라인별로 나눔
  3. Mapping : 라인별로 문자열을 입력 → (key, value) 형태로 출력
  4. Shuffing : 같은 ket를 가지는 데이터끼리 분류
  5. Reducing : 각 key 별로 빈도수를 합산해서 출력
  6. Result : Reduce에서 출력된 데이터를 협처서 hadoop 파일시스템에 저장

Job

Job은 “Full Program” 즉, 전체 프로그램을 의미한ㄴ다. 데이터 지밥을 통해 Mapper와 Reducer를 전체 실행한다. Task는 데이터 조각을 통해 하나의 Mapper 또는 Reducer를 실행하게 된다.

  • 클라이언트가 수행하려는 작업단위로 입력데이터, 맵리듀스 프로그램, 설정 정보로 구성
  • hadoop은 Job을 Map Task와 Reduce Task로 작업을 나눠 실행
  • Job이 실행되는 과정을 제어해주는 노드

시스템 구성

MapReduce 시스템은 Client, Job Tracker, Task Tracker로 구성

  • Client : 분석하고자 하는 데이터를 Job의 형태로 Job Tracker에게 전달
  • Job Tracker : Name Node의 위치하며 Hadoop 클러스터에 등록된 전체 Job들을 스케줄링하고 모니터링 수행 (마스터 노드)

Task Tracker :

  • Data Node에서 실행되는 데몬으로 사용자가 설정한 MapReduce 프로그램을 실행
  • Job Tracker로부터 작업을 요청 받고, 요청한 Map과 Reduce 갯수만큼 Map Task와 Reduce Task를 생성
  • Job Tracker에게 상황 보고

MapReduce 간단 실습

더보기
  • hadoop 사용 X
  • 분산처리 X

MapReduce 예제 (그림 : 구성 참고)

분석 데이터

vi test.txt
Deer Bear River
Car Car River
Deer Car Bear

mapper.py

#!/usr/bin/env python3
"""mapper.py"""

import sys

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print ('%s\\t%s' % (word, 1))

결과

cat test.txt | python mapper.py
Deer    1
Bear    1
River   1
Car     1
Car     1
River   1
Deer    1
Car     1
Bear    1

reducer.py

#!/usr/bin/env python3
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print ('%s\\t%s' % (current_word, current_count))

결과

cat test.txt | python mapper.py | sort -k1,1 | python reducer.py
Bear    2
Car     3
Deer    2
River   2

 

MapReduce 실습

더보기
  • hadoop 사용 O
  • 분산처리 O

MapReduce 예제 (그림 : 구성 참고)

분석 데이터

vi test.txt
Deer Bear River
Car Car River
Deer Car Bear

실행전 Hadoop 폴더 생성및 전송

# 폴더 생성
hdfs dfs -mkdir hdfs://localhost:9000/user/hadoop/input
# 분석 파일 전송
hdfs dfs -put hdfs://localhost:9000/user/hadoop/input/test.txt

Hadoop MapReducer 실행

hadoop jar hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar wordcount input  output
2022-09-29 06:17:46,735 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-09-29 06:17:46,828 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-09-29 06:17:46,828 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2022-09-29 06:17:47,084 INFO input.FileInputFormat: Total input files to process : 1
2022-09-29 06:17:47,187 INFO mapreduce.JobSubmitter: number of splits:1
2022-09-29 06:17:47,404 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1123466837_0001
2022-09-29 06:17:47,404 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-09-29 06:17:47,608 INFO mapreduce.Job: The url to track the job: <http://localhost:8080/>
2022-09-29 06:17:47,609 INFO mapreduce.Job: Running job: job_local1123466837_0001
2022-09-29 06:17:47,615 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2022-09-29 06:17:47,622 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2022-09-29 06:17:47,622 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-09-29 06:17:47,623 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2022-09-29 06:17:47,666 INFO mapred.LocalJobRunner: Waiting for map tasks
2022-09-29 06:17:47,667 INFO mapred.LocalJobRunner: Starting task: attempt_local1123466837_0001_m_000000_0
2022-09-29 06:17:47,690 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2022-09-29 06:17:47,690 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-09-29 06:17:47,711 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
2022-09-29 06:17:47,715 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/user/hadoop/input/test.txt:0+44
2022-09-29 06:17:47,833 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2022-09-29 06:17:47,833 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
2022-09-29 06:17:47,833 INFO mapred.MapTask: soft limit at 83886080
2022-09-29 06:17:47,833 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
2022-09-29 06:17:47,833 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
2022-09-29 06:17:47,862 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2022-09-29 06:17:47,999 INFO mapred.LocalJobRunner:
2022-09-29 06:17:48,002 INFO mapred.MapTask: Starting flush of map output
2022-09-29 06:17:48,002 INFO mapred.MapTask: Spilling map output
2022-09-29 06:17:48,003 INFO mapred.MapTask: bufstart = 0; bufend = 80; bufvoid = 104857600
2022-09-29 06:17:48,003 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
2022-09-29 06:17:48,032 INFO mapred.MapTask: Finished spill 0
2022-09-29 06:17:48,044 INFO mapred.Task: Task:attempt_local1123466837_0001_m_000000_0 is done. And is in the process of committing
2022-09-29 06:17:48,049 INFO mapred.LocalJobRunner: map
2022-09-29 06:17:48,049 INFO mapred.Task: Task 'attempt_local1123466837_0001_m_000000_0' done.
2022-09-29 06:17:48,056 INFO mapred.Task: Final Counters for attempt_local1123466837_0001_m_000000_0: Counters: 24
        File System Counters
                FILE: Number of bytes read=281162
                FILE: Number of bytes written=922809
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=44
                HDFS: Number of bytes written=0
                HDFS: Number of read operations=5
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=1
                HDFS: Number of bytes read erasure-coded=0
        Map-Reduce Framework
                Map input records=3
                Map output records=9
                Map output bytes=80
                Map output materialized bytes=50
                Input split bytes=113
                Combine input records=9
                Combine output records=4
                Spilled Records=4
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=25
                Total committed heap usage (bytes)=151887872
        File Input Format Counters
                Bytes Read=44
2022-09-29 06:17:48,060 INFO mapred.LocalJobRunner: Finishing task: attempt_local1123466837_0001_m_000000_0
2022-09-29 06:17:48,060 INFO mapred.LocalJobRunner: map task executor complete.
2022-09-29 06:17:48,066 INFO mapred.LocalJobRunner: Waiting for reduce tasks
2022-09-29 06:17:48,067 INFO mapred.LocalJobRunner: Starting task: attempt_local1123466837_0001_r_000000_0
2022-09-29 06:17:48,073 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2022-09-29 06:17:48,073 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-09-29 06:17:48,074 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
2022-09-29 06:17:48,078 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@7d714426
2022-09-29 06:17:48,080 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2022-09-29 06:17:48,096 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=167490352, maxSingleShuffleLimit=41872588, mergeThreshold=110543640, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2022-09-29 06:17:48,098 INFO reduce.EventFetcher: attempt_local1123466837_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2022-09-29 06:17:48,132 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1123466837_0001_m_000000_0 decomp: 46 len: 50 to MEMORY
2022-09-29 06:17:48,133 INFO reduce.InMemoryMapOutput: Read 46 bytes from map-output for attempt_local1123466837_0001_m_000000_0
2022-09-29 06:17:48,136 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 46, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->46
2022-09-29 06:17:48,137 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
2022-09-29 06:17:48,138 INFO mapred.LocalJobRunner: 1 / 1 copied.
2022-09-29 06:17:48,139 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2022-09-29 06:17:48,150 INFO mapred.Merger: Merging 1 sorted segments
2022-09-29 06:17:48,150 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 39 bytes
2022-09-29 06:17:48,154 INFO reduce.MergeManagerImpl: Merged 1 segments, 46 bytes to disk to satisfy reduce memory limit
2022-09-29 06:17:48,155 INFO reduce.MergeManagerImpl: Merging 1 files, 50 bytes from disk
2022-09-29 06:17:48,155 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
2022-09-29 06:17:48,155 INFO mapred.Merger: Merging 1 sorted segments
2022-09-29 06:17:48,155 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 39 bytes
2022-09-29 06:17:48,156 INFO mapred.LocalJobRunner: 1 / 1 copied.
2022-09-29 06:17:48,181 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2022-09-29 06:17:48,244 INFO mapred.Task: Task:attempt_local1123466837_0001_r_000000_0 is done. And is in the process of committing
2022-09-29 06:17:48,247 INFO mapred.LocalJobRunner: 1 / 1 copied.
2022-09-29 06:17:48,248 INFO mapred.Task: Task attempt_local1123466837_0001_r_000000_0 is allowed to commit now
2022-09-29 06:17:48,277 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1123466837_0001_r_000000_0' to hdfs://localhost:9000/user/hadoop/output
2022-09-29 06:17:48,279 INFO mapred.LocalJobRunner: reduce > reduce
2022-09-29 06:17:48,279 INFO mapred.Task: Task 'attempt_local1123466837_0001_r_000000_0' done.
2022-09-29 06:17:48,280 INFO mapred.Task: Final Counters for attempt_local1123466837_0001_r_000000_0: Counters: 30
        File System Counters
                FILE: Number of bytes read=281294
                FILE: Number of bytes written=922859
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=44
                HDFS: Number of bytes written=28
                HDFS: Number of read operations=10
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=3
                HDFS: Number of bytes read erasure-coded=0
        Map-Reduce Framework
                Combine input records=0
                Combine output records=0
                Reduce input groups=4
                Reduce shuffle bytes=50
                Reduce input records=4
                Reduce output records=4
                Spilled Records=4
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=3
                Total committed heap usage (bytes)=151887872
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Output Format Counters
                Bytes Written=28
2022-09-29 06:17:48,280 INFO mapred.LocalJobRunner: Finishing task: attempt_local1123466837_0001_r_000000_0
2022-09-29 06:17:48,280 INFO mapred.LocalJobRunner: reduce task executor complete.
2022-09-29 06:17:48,613 INFO mapreduce.Job: Job job_local1123466837_0001 running in uber mode : false
2022-09-29 06:17:48,614 INFO mapreduce.Job:  map 100% reduce 100%
2022-09-29 06:17:48,615 INFO mapreduce.Job: Job job_local1123466837_0001 completed successfully
2022-09-29 06:17:48,625 INFO mapreduce.Job: Counters: 36
        File System Counters
                FILE: Number of bytes read=562456
                FILE: Number of bytes written=1845668
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=88
                HDFS: Number of bytes written=28
                HDFS: Number of read operations=15
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=4
                HDFS: Number of bytes read erasure-coded=0
        Map-Reduce Framework
                Map input records=3
                Map output records=9
                Map output bytes=80
                Map output materialized bytes=50
                Input split bytes=113
                Combine input records=9
                Combine output records=4
                Reduce input groups=4
                Reduce shuffle bytes=50
                Reduce input records=4
                Reduce output records=4
                Spilled Records=8
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=28
                Total committed heap usage (bytes)=303775744
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=44
        File Output Format Counters
                Bytes Written=28

결과

# 결과 확인
hdfs dfs -cat hdfs://localhost:9000/user/hadoop/output/part-r-00000
Bear    2
Car     3
Deer    2
River   2
728x90
반응형

'개발 > Big data' 카테고리의 다른 글

Hadoop - 이론, 실습  (0) 2023.08.01