目录
MapReduce 框架原理
1.InputFormat可以对Mapper的输入进行控制
2.Reducer阶段会主动拉取Mapper阶段处理完的数据
3.Shuffle可以对数据进行排序、分区、压缩、合并,核心部分。
4.OutPutFomat可以对Reducer的输出进行控制
Map方法之后,Reduce方法之前的数据处理(洗排)过程称为Shuffle。、
如果把ReduceTask设置成0,不进行ruduce,因为shuffle阶段也会消失
MapTask进程对每一个<K,V>
调用一次map()方法
ReduceTask进程对每一组相同k的<k,v>
组调用一次reduce()方法。
问题
要求统计结果按照条件输出到不同的文件中,比如:统计结果135开头的输入到一个文件、136开头的输入一个文件。
下面代码是分区大于1的情况下默认的分区类,自定义的分区实际上替换的是这个。
public class HashPartitioner<K,V> extends Partitioner<K,V>{
public int getPartition(K key,V value,int numReduceTask){
//默认分区是根据key的hashCode对ReduceTasks个数取模得到的,用户没法控制key存储到哪个分区。
//key.hasCode() & Integer.MAX.VALUE 用于控制key最大不超过Integer.MAX.VALUE
retrun(key.hasCode() & Integer.MAX.VALUE)%numReduceTask;
}
}
1.自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<K,V>{
@Override
public int getPartition(K key,V value,int numReduceTask){
//控制分区代码逻辑
return partition;
}
}
2.在Job驱动中,设置分区类为自定义的Partitioner
job.setPartitionerClass(CustomPartitioner.class);
3.在Job驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
如果setNumReduceTasks=1,直接走else。
job.setNumReduceTasks(2);
将统计结果按照手机号开头输出到不同文件(分区)中
输入数据:D:\hadoop_data\input\inputpartition
文件
期望输出数据:手机号136、137、138、139开头的分别放入一个文件中,剩下的放入一个文件中
输入数据
输出数据
文件1 136开头的数据
文件2 137开头的数据
文件3 138开头的数据
文件4 139开头的数据
文件5 其他
自定义分区
分区0 136
分区1 137
分区2 138
分区3 139
分区4 其他
设置使用自定义分区,指定ReduceTasks的数量为5
1.使用之前序列化的代码
2.新增ProvicePartitioner类
ProvicePatitioner.class
package ranan.mapreduce.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
//text是手机号
String phone = text.toString().substring(0,3);
int partition;
//防止空指针异常,常量写在前面
if("136".equals(phone)){
partition=0;
}else if("137".equals(phone)){
partition=1;
}else if("138".equals(phone)){
partition=2;
}else if("139".equals(phone)){
partition=3;
}else {
partition=4;
}
return partition;
}
}
小技巧:字符串比较相等时,把常量写在前面可以防止空指针异常。
FlowDriver.class 新增一下代码
//设置使用自定义类
job.setPartitionerClass(ProvincePartitioner.class);
//设置ReduceTasks的个数
job.setNumReduceTasks(5); //有5个分区,可以设置5到5以上
//修改输入路径
1.ReduceTask的数量>getPartition结果的数,则会产生几个空的输出文件。
2.1<ReduceTask的数量<getPartition结果的数,有一部分分区数据没有地方放,报错。
3.ReduceTask的数量=1,最终只会产生一个输出文件。
4.分区号必须从0开始累加
MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,默认排序方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区,当环形缓存区使用率达到一定阈值后(80%),再对缓冲区中的数据进行一次快排。并将这些有序数据溢写到磁盘上。当所有数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则合并后溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件。所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可
以实现排序。WritableComparable是继承了Writable接口的
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
public class FlowBean implements WritableComparable <FlowBean> {
@Override
public int compareTo(FlowBean bean) {
int result;
//按照总流量大小,倒序排列
if (this.sumFlow >bean.getSumFlow()) {
result =-1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
}
根据序列化案例产生的结果文件再此对总流量进行倒序排序
输入数据D:\hadoop_data\input\inputpartition
文件
第一次处理后的数据 part-r-00000,在这个文件的基础上,按总流量进行倒序排序。
期望输出数据
一般需要进行两次MapReduce,因为是按照key排序,第一次MapReduce的key是电话号码,算出总流量,第二次MapReduce的key是FlowBean对象里的总流量。
需求:根据手机的总流量进行倒序排序
输入数据
输出数据
FlowBean 类
package ranan.mapreduce.writableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 1、定义类实现writable接口
* 2、重写序列化和反序列化方法
* 3、重写空参构造
* 4、toString方法
*/
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 总流量
// 空参构造
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean bean) {
int result;
//按照总流量大小,倒序排列
if (this.sumFlow >bean.getSumFlow()) {
result =-1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
}
FlowMapper 类
package ranan.mapreduce.writableComparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//进行切割
String [] str = line.split("\t");
//封装
outV.set(str[0]);
outK.setUpFlow(Long.parseLong(str[1]));
outK.setDownFlow(Long.parseLong(str[2]));
outK.setSumFlow();
context.write(outK,outV);
}
}
FlowReducer 类
package ranan.mapreduce.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean,Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text ,FlowBean>.Context context) throws IOException, InterruptedException {
for(Text value:values){
//这里是不需要合并key的,输出的key是手机号,输出的value是FlowBean
context.write(value,key);
}
}
}
FlowDriver 类
package ranan.mapreduce.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar
job.setJarByClass(FlowDriver.class);
// 3 关联mapper 和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 设置mapper 输出的key和value类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5 设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 设置数据的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop_data\\input\\inputpartition"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop_data\\output"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
需求:如果总流量相同,按上行流量从小到大排序
//写法1
@Override
public int compareTo(FlowBean bean) {
int result;
//按照总流量大小,倒序排列
if (this.sumFlow >bean.sumFlow) {
return -1;
}else if (this.sumFlow < bean.sumFlow) {
return 1;
}else {
if(this.upFlow>bean.upFlow){
return 1;
}else if(this.upFlow<bean.upFlow){
return -1;
}
else {
return 0;
}
}
}
//写法2
@Override
public int compareTo(FlowBean bean) {
int result;
//按照总流量大小,倒序排列
if (this.sumFlow == bean.sumFlow) {
//如果相同按上行流量从小到大排序
return (int)(this.upFlow - bean.upFlow);
}else {
return (int)(bean.sumFlow-this.sumFlow);
}
}
需求:136 137 138 139 其他 分5个区,每个区按总流量降序排,相同按上行流量从小到大排序
用上面的代码其余不变,增加类ProvincePartitioner.class
package ranan.mapreduce.writableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import ranan.mapreduce.partition.FlowBean;
public class ProvincePartitioner extends Partitioner<FlowBean,Text> {
@Override
public int getPartition(FlowBean flowBean,Text text,int numPartitions) {
//text是手机号
String phone = text.toString().substring(0,3);
int partition;
//防止空指针异常,常量写在前面
if("136".equals(phone)){
partition=0;
}else if("137".equals(phone)){
partition=1;
}else if("138".equals(phone)){
partition=2;
}else if("139".equals(phone)){
partition=3;
}else {
partition=4;
}
return partition;
}
}
在FlowDriver挂载分区
//挂载分区
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
提前对(a,1)(a,1) 进行合并成(a,2),帮助MapReduce先处理一点,提高效率。
说明:
1.Combiner是MR程序中Mapper和Reducer之外的一种组件
2.Combiner组件的父类是Reducer
3.和Reducer的区别在于运行的位置,Combiner是在每一个MapTask所在节点运行,Reduce是接受全局所有Mapper的输出结果。
4.Combiner的意义是对每一个MapTask的输出进行局部汇总,以减少网络传输量
5.Combiner能够应用的前提是不能影响最终的业务逻辑,并且Combiner输出的KV应该跟Reducer输入的kv类型对应起来。
下图是不可以使用的场景
基于WordCount案例,统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量,采用Combiner功能
数据输入:D:\hadoop_data\input\combiner
//combiner.txt
xx rr
dd rr
yy rr
xx dd
期望结果
在Map阶段处理数据
<rr,3>
<xx,2>
<yy,1>
<dd,2>
1.增加一个WordCountCombiner类继承Reducer
2.在WordCountCombiner中 单词汇总、将统计结果输出
1.增加一个WordCountCombiner类继承Reducer
package ranan.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
1.继承Reducer在Combiner在Map阶段,输入的数据是Map传递过来的(Map的输出),输出给Reduce(Reduce的输入)
*/
public class WordCountCombiner extends Reducer <Text,IntWritable,Text, IntWritable> {
//2.重写reduce方法,遇见不同的key执行一次reduce
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum =0;
for (IntWritable num:values) {
sum += num.get(); //num转化成int类型
}
outV.set(sum);
context.write(key,outV);
}
}
2.在job中进行配置,增加如下代码
job.setCombinerClass(WordCountCombiner.class);
3.执行结果,符合预期
4.补充
如果把ReduceTask设置成0,不进行ruduce,shuffle阶段也会消失
我们发现WordCountReducer和WordCountCombiner实现的逻辑是一样的,所以将WordCountReducer作为Combiner,在WordCountDriver驱动类中指定。
job.setCombinerClass(WordCountReducer.class);
手机扫一扫
移动阅读更方便
你可能感兴趣的文章