HDFS-Suffle
阅读原文时间:2023年07月12日阅读:1

  1、官网图

  2、MR确保每个Reducer的输入都是按照key排序的。系统执行排序的过程(即将Mapper输出作为输入传给Reducer)成为Shuffle

  

  1、默认分区HashPartitioner

public class HashPartitioner extends Partitioner {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

  默认分区是根据key的hashcode对ReduceTeasks个数取模得到的。用户不能控制key存储的分区

  2、自定义分区

  (1)自定义类继承Partitioner,重写getPartition()方法

public class CustomPartitioner extends Partitioner {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {

        //TODO
return 分区;
}
}

  (2)在job驱动中,设置自定义partitioner

job.setPartitionerClass(CustomPartitioner.class)

  (3)自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduceTask

job.setNumReduceTasks(5)

  3、分区总结

  (1)如果reduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

  (2)如果reduceTask的数量 < getpartition的结果数,则由一部分分区数据无处安放,会exception

  (3)如果reduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这个reduceTask,最终也就只会产生一个结果文件part-r-0000

  4、分区案例

  假设自定义分区数为5 

job.setNumReduceTasks(1) #正常运行,产生一个输出文件
job.setNumReduceTasks(2) #报错
job.setNumReduceTasks(6) #大于5,正常运行,会产生一个空文件

  1、概述

  (1)排序是MapReduce框架中最重要的操作之一

  (2)Map Task和Reduce Task均会对数据(按照key)进行排序。

  (3)该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

  (4)默认排序时按照字典顺序排序,且实现该排序的方法是快速排序 

  (5)对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值(默认100M)后,再对缓冲区中的数据进行一次局部(区内)排序,排序完成后,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并(Combiner如果有),以将这些文件合并成一个大的有序文件。

  (6)对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。  

  2、排序分类

  (1)部分排序:MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

  (2)全排序:

    最终输出结果只有一个文件,且文件内部有序。

    但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

    解决:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

  (3)辅助排序:(GroupingComparator分组):

    Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

  (4)二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

  3、排序实现

  (1)bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序 

@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

  (2)自定义排序器,实现WritableComparable接口重写compare方法

public class MyRawComparator extends WritableComparator{

@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
long thisValue = readLong(b1, s1);
long thatValue = readLong(b2, s2);
return (thisValue<thatValue ? 1 : (thisValue==thatValue ? 0 : -1));
}

}

    在Driver里面设置自定义排序器 

conf.set("mapreduce.job.output.key.comparator.class", "com.mr.sort.MyRawComparator");

  (3)自定义排序器,实现RawComparator接口重写compare方法 

private final DataInputBuffer buffer;
private final F key1;
private final Fkey2;

public FCompartor() {

 buffer=new DataInputBuffer();  
 key1=new F();  
 key2=new F();

}
public int compare(F o1, F o2) {
return -o1.getXXX().compareTo(o2.getSumXXX());
}

@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);

      buffer.reset(b2, s2, l2);                   // parse key2  
      key2.readFields(buffer);

      buffer.reset(null, 0, 0);                   // clean up reference  
    } catch (IOException e) {  
      throw new RuntimeException(e);  
    }

    return compare(key1, key2);  

}

    在Driver里面设置自定义排序器  

conf.set("mapreduce.job.output.key.comparator.class", "com.mr.sort.FCompartor");

  (1)combiner是MR程序中Mapper和Reducer之外的一种组件

  (2)combiner组件的父类就是Reducer

  (3)combiner和reducer的区别在于运行的位置:

    Combiner是在每一个maptask所在的节点运行

    Reducer是接收全局所有Mapper的输出结果

  (4)combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

  (5)combiner能够应用的前提是不能影响最终的业务逻辑

  (6)combiner的输出kv应该跟reducer的输入kv类型要对应起来

  (7)使用场景:加减计算适用,乘除不适用

  (8)运行时机:

 ①在溢写之前运行(如果设置了Combinner,MapTask的shuffle中,一定会运行)

      在溢写前,对需要溢写的KV进行合并,合并后可以将相同Key的KV合并为1个KV

    例如:  (hadoop,1),(hive,1),(spark,1),(hadoop,1),(hive,1),(spark,1),(hadoop,1),(hive,1),(spark,1)

    启动:   (hadoop,3),(hive,3),(spark,3)

    ②在最终的merge阶段,当溢写的片段 >= 3时,才会再次运行Combiner

    split0:   (hadoop,3),(hive,3),(spark,3)
          split1:     (hadoop,3),(hive,3)
            。。。N

    split.final :(hadoop,3),(hive,3),(spark,3),(hadoop,3),(hive,3)

  2、自定义Combiner

  (1)自定义一个combiner继承Reducer,重写reduce方法

public class WordcountCombiner extends Reducer{
@Override
protected void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {

 int count = 0;

 for(IntWritable v :values){  
     count = v.get();  
 }  
 context.write(key, new IntWritable(count));  

}
}

  (2)在job驱动类中设置

    job.setCombinerClass(WordcountCombiner.class);

  对reduce阶段的数据根据某一个或几个字段进行分组。

  使用案例:

  如求每个订单中最贵的商品

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器