상단

MapReduce를 정리 합니다.

 

MapReduce Architecture


[700px](파일:Hadoop architecture03.png.md)

 

MapReduce 설치


  • [CentOS에서 Hadoop 설치](Hadoop.md#CentOS에서 Hadoop 설치.md)

 
 

MapReduce 가이드


  • MapReduce

    • Job-Tracker : 작업을 할당 (이중화 문제 있음)

    • Task-Trackers : 실제 작업을 처리

     
  • Map + Reduce

    • Mapper (map) : 입력을 받아 처리, 처리 결과 정렬

    • Reducer (reduce) : Mapper에서 처리된 결과를 받아 통합

     
  • Mapper Sample

 
 //--- 입력 : Object. Long 형태의 key, Text. 파일에서 읽은 하나의 라인
 //---        InputReader 등을 사용하여 입력되는 값(Text)의 양식을 변경할 수 있음
 //--- 출력 : Text, IntWritable -> Reduce에 전달, Text는 키
 public class WordCountMapper extends Mapper {
     private final static IntWritable one = new IntWritable(1);
     public void map(Object key, Text word, Context context) throws IOException, InterruptedException {
         context.write(word, one);
     }
 }
 
  • Reducer Sample

 
 //--- 입력 : Text, IntWritable <- Map에서 전달 받음, Text를 키로 하여 값을 취합하여 전달받음
 //--- 출력 : Text, IntWritable
 public class WordCountReducer extends Reducer {
     private final static IntWritable count = new IntWritable();
     public void reduce(Text word, Iterable values, Context context) throws IOException, InterruptedException {
         int sum = 0;
         for (IntWritable val : values) {
             sum += val.get();
         }
         count.set(sum);
         context.write(word, count);
     }
 }
 

http://songsungkyun.cafe24.com/images/MapReduce.jpg
 
http://4.bp.blogspot.com/_j6mB7TMmJJY/SS0CEJLklnI/AAAAAAAAAGQ/ogPGJ3WYpt4/s400/P4.png

 

MapReduce 개발 환경 설정


  • MapReduce 실행

 
 hadoop jar ~.jar
 
  • Mapper

    • 입력키 (key), 입력값 (value) : 입력 라인

    • 출력키 (outKey), 출력값 (123)

 
 package com.jopenbusiness.mapreduce.mapper;
 
 import java.io.IOException;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 
 public class zzmapper extends Mapper {
 	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 		context.write(new Text(""), new IntWritable(123));
 	}
 }
 
  • Reducer

    • 입력키 (key), 입력값 (values)

    • 출력키 (outKey), 출력값 (567)

 
 package com.jopenbusiness.mapreduce.reducer;
 
 import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 
 public class zzreducer extends Reducer {
 	public void reduce(Text key, Iterator values, Context context) throws IOException, InterruptedException {
 		context.write(key,  new IntWritable(123));
 	}
 } 
 
  • MapReducer 실행

 
 	public Boolean process() {
 		Job job = null;
 		
 		try {
 			job = new Job();
 			job.setJarByClass(zztemp.class);
 			
 			FileInputFormat.addInputPath(job, new Path("~"));
 			job.setMapperClass(zzmapper.class);	
 			
 			job.setCombinerClass(zzreducer.class);
 
 			FileOutputFormat.setOutputPath(job, new Path("~"));
 			job.setReducerClass(zzreducer.class);
 			job.setOutputKeyClass(Text.class);
 			job.setOutputValueClass(IntWritable.class);
 			return job.waitForCompletion(true);
 		} catch (ClassNotFoundException e) {
 			e.printStackTrace();
 		} catch (IOException e) {
 			e.printStackTrace();
 		} catch (InterruptedException e) {
 			e.printStackTrace();
 		}
 		return true;
 	}
 
 
  • Generic type

    • Text, ByteWritable, ArrayWritable

    • IntWritable, LongWritable, FloatWritable, DoubleWritable

    • BooleanWritable

    • MapWritable, SortedMapWritable, TwoDArrayWritable

    • VIntWritable, VLongWritable

    • GenericWritable, CompressedWritable, ObjectWritable, VersionedWritable, NullWritable

     
  • Hadoop 스트리밍

 
 hadoop jar /appl/hadoop/contrib/streaming/hadoop-*-streaming.jar
        -input input/~.txt -output output
        -mapper ~1.bash
        -combiner ~2.bash
        -reducer ~3.bash
        -file ~1.bash -file ~2.bash -file ~3.bash
 
 

표준 MapReduce Job


  • StandardMapReduce.java

 
 package com.jopenbusiness.hadoop.sample2; 
 
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 public class StandardMapReduce extends Configured implements Tool {
 	public static void main(String[](.md) args) throws Exception {
 		System.exit(ToolRunner.run(new StandardMapReduce(), args));
 	}
 
 	public int run(String[](.md) args) throws Exception {
 		Configuration conf = null;
 		Job job = null;
 		FileSystem fs = null;
 		
 		//---	입력 파라메터 확인
 		if (args.length != 2) {
 			System.err.printf("Usage: %s [options](generic)  \n", getClass().getSimpleName());
 			ToolRunner.printGenericCommandUsage(System.err);
 		}
 		
 		//---	MapReduce 설정
 		conf = getConf();
 		job = Job.getInstance(conf, "Max temperature");
 		job.setJarByClass(StandardMapReduce.class);
 
 		job.setInputFormatClass(TextInputFormat.class);
 		job.setMapperClass(StandardMapReduceMapper.class);
 //		job.setCombinerClass(StandardMapReduceReducer.class);
 //		job.setPartitionerClass(StandardMapReducePartitioner.class);
 //		job.setGroupingComparatorClass(StandardMapReduceGroupComparator.class);
 //		job.setSortComparatorClass(StandardMapReduceSortComparator.class);
 		job.setReducerClass(StandardMapReduceReducer.class);
 		job.setOutputFormatClass(TextOutputFormat.class);
 		
 		//---	입출력 데이터의 전달 경로 지정
 		fs = FileSystem.get(conf);
 		fs.delete(new Path(args[1](1.md)), true);
 		
   		FileInputFormat.addInputPath(job, new Path(args[0](0.md)));
 		FileOutputFormat.setOutputPath(job, new Path(args[1](1.md)));
 		job.setOutputKeyClass(Text.class);
 		job.setOutputValueClass(IntWritable.class);
 		
 		//---	Job 실행 /w 모니터링
 		return (job.waitForCompletion(true)) ? 0 : 1;
 	}	
 	
 	public class StandardMapReduceMapper extends Mapper {
 		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 		}
 	}
 
 	public class StandardMapReduceReducer extends Reducer {
 		public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
 		}
 	}
 }
 

참고 문헌


 
 

분류: BigData

최종 수정일: 2024-09-30 12:26:18

이전글 :
다음글 :