Hadoop【MR的分区、排序、分组】
阅读原文时间:2023年07月09日阅读:2

[toc]

一.分区

问题:按照条件将结果输出到不同文件中

自定义分区步骤

1.自定义继承Partitioner类,重写getPartition()方法

2.在job驱动Driver中设置自定义的Partitioner

3.在Driver中根据分区数设置reducetask数

分区数和reducetask关系

案例实操

将统计结果按照手机归属地不同省份输出到不同文件中(分区),手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中

(1)自定义分区类

MyPartitioner.class

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        String phone = text.toString();
        if (phone.startsWith("136")) {
            return 0;
        } else if (phone.startsWith("137")) {
            return 1;
        } else if (phone.startsWith("138")) {
            return 2;
        }else if (phone.startsWith("139")){
            return 3;
        }else {
            return 4;
        }
    }
}
(2)在Driver类设置分区和reducetask数
//设置自定义partitioner
job.setPartitionerClass(MyPartioner.class);
//设置reducetask数量
job.setNumReduceTasks(5);

二.全排序、分区排序、分组

当自定义的对象作为key,按照指定条件进行排序

实现排序的2种方式

1.对象实现WritableComparable接口

实现WritableComparable接口,重写compareTo方法,就可以实现排序(二次排序)

public class OrderBean implements WritableComparable<OrderBean> { 

    //自定义排序,先按pid升序,再按pname降序
    @Override
    public int compareTo(OrderBean o) {
        int compare = this.pid.compareTo(o.pid);
        if (compare == 0) {
            return -this.pname.compareTo(o.pname);
        }
        return compare;
    }
}
2.继承WritableComparator类

自定义比较器继承WritableComparator类,父类构造方法增加需要比较的Bean对象,

//继承WritableComparator类
public class MyGroupCompartor extends WritableComparator {

    public MyGroupCompartor(){
        //增加Bean对象
        super(OrderBean.class,true);
    }
    // 对Bean的排序方法
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;
        return oa.getPid().compareTo(ob.getPid());
    }
}

全排序

不分区,只有一个reducetask,针对Key进行排序

分区排序

针对key全排序,然后针对key进行分区

辅助排序【自定义分组】

分析:已经对key进行排序,比如key对象为OrderBean的排序是id,pname的二次排序

,在进入reduce()的分组希望是id相同的进入一组,那么就需要自定义分组针对id进行分组

OrderBean
id        pname  amount
1        小米
1                 2400
1            1500
2        华为
2                2400
2           3400
自定义分组比较器

MyGroupCompartor.class

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupCompartor extends WritableComparator {

    public MyGroupCompartor(){
        super(OrderBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;
        return oa.getPid().compareTo(ob.getPid());
    }
}
在Driver类中声明自定义分组
job.setGroupingComparatorClass(MyGroupCompartor.class);