Mapreduce确保每个reducer的输入都是按键排序的。系统执行排序的过程(Map方法之后,Reduce方法之前的数据处理过程)称之为Shuffle。
Partition分区流程处于Mapper数据属于初到环形缓冲区时进行,此时会将通过Partition分区获取到的每一行key-value对应的分区值计入环形缓冲流的左。
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
分区可以实现将Map阶段处理的数据在向环形缓冲区写入的时候是以"分类"的方式写的。"一般情况下",MR程序分区数有多少,ReduceTask的数量就应该有多少,可以实现一个分区的数据一个ReduceTask去处理。ReduceTask处理完成之后都会去生成一个结果文件
以WordCount为例,设置ReduceTask的数量
job.setNumReduceTasks(2);
默认使用的是HashPartitioner分区机制
@Public
@Stable
public class HashPartitioner
public HashPartitioner() {
}
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks; // 2147483647是int类型的最大值,numReduceTask即是上面设置的ReduceTask的值
}
}
这种分区机制是不可控的,因为它是根据Map阶段获取的key的hashCode值和numReduceTask取余得来的,但是key的hashCode值不确定,所以把key-value数据分到哪一个区我们是不确定的。比如会出现不同的key的HashCode值一致,导致结果输出的不可控制。因此我们在去定义分区的时候我们最常用的方法就是:自定义分区机制
自定义Partition步骤:
要求:
a) 返回的分区数字最好是连续的,比如返回了 0 2 3 4 ,数字不连续,不可行
b) 一般情况下,有几个分区,就在Driver中指定numReduceTasks就有几个,不能多写也不能少写
public int getPartition(Text key, Text value, int numReduceTask)方法有三个参数,返回一个int类型的值,它们代表的含义分别是:
@param key:Map阶段输出的key值
@param value:Map阶段输出的value值
@param numReduceTask:定义的ReduceTask的任务数,默认是1
@return 数字,代表的是我要将这条key-value数据输送到哪个分区
3. 在Driver类中指定分区所在的类与分区数量
// 定义不使用默认的HashPartitioner分区,而是使用自定义的分区
job.setPartitionerClass(PhoneDataPartition.class);
// 指定你的ReduceTask必须是5
job.setNumReduceTasks(5);
这里要求ReduceTask的数量必须和自定义的Partition类中设置的分区数量保持一致,原因就是在MR程序中一般默认情况下是一个分区要有一个ReduceTask专门去处理。但是在有些情况下,ReduceTask可能少写或者多写,这样会出一些奇怪的问题。
java.lang.Exception: java.io.IOException: Illegal partition for 138 (2)
(这里可以利用打工人来举例,假设有五个人一起去打工,但是此时岗位只有一个,那么这五个人可以一起做着一份工作。但是如果设置了两个岗位,分配时将会出现矛盾状况,比如两个工人都想做一份工作,或者两个工人都不想做这一个工作,就会产生冲突)
3. 假设分区有5个,ReduceTask的数量也有5个,那么百分之百可以正常运行,这也是最佳状态/最理想状态。(因为每个人都分配到了自己想做的工作)
4. 假设分区有5个,ReduceTask的数量多于5个,程序也是百分之百可以正常运行,但是会多出一个空白结果文件
【注意】以后在工作中写的ReduceTask的数量最好和分区的数量保持一致,这样的话才能保证处理出的MR程序处于最佳状态
需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
分区:
136----分区1
137----分区2
138----分区3
139----分区4
其他---分区5
默认分区机制:5个分区,需要设置5个ReduceTask,同时默认分区机制是按照key的HashCode值分配的
代码:
PhoneDataMapper.java
public class PhoneDataMapper extends Mapper
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[1];
// 拿到手机号的前三位
String phoneThree = phone.substring(0, 3);
context.write(new Text(phoneThree), value);
}
}
PhoneDatePartition.java
public class PhoneDataPartition extends Partitioner
@Override
public int getPartition(Text key, Text value, int numReduceTask) {
String s = key.toString();
switch (s) {
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
PhoneDataReducer.java
public class PhoneDataReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
for (Text value : values) {
context.write(NullWritable.get(), new Text(value));
}
}
}
PhoneDateDriver.java
public class PhoneDataReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
for (Text value : values) {
context.write(NullWritable.get(), new Text(value));
}
}
}
运行截图
需求:按照单词首字母的ASCII码进行奇偶分区(Partitioner)
程序分析
源代码
WordCountPartition.java
public class WordCountPartition extends Partitioner
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
String word = text.toString();
char first = word.charAt(0);
if (first % 2 == 0 ) {
return 0;
} else {
return 1;
}
}
}
运行结果
举一反三
单词计数案例:要求根据单词首字母的大小写分区,如果单词首字母是大写,那么单词输出在一个文件中,如果单词的首字母是小写,那么单词输出在另一个分区中
源代码:
public class OtherPartition extends Partitioner
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
String word = text.toString();
char first = word.charAt(0);
if (Character.isUpperCase(first)) {
return 1;
} else {
return 0;
}
}
}
运行结果:
【问题】在一次MR程序中需要进行几次排序?在什么时候进行排序?
【解答】会进行三次排序。第一次是在环形缓冲区使用率达到一定阈值(80%)时,会对缓冲区中的数据进行一次快速排序;第二次是当数据处理完毕时,会对磁盘上的所有文件进行归并排序;第三次是ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序
部分排序与全排序的区别:全排序是一个结果文件,部分排序是多个结果文件。在实际编程中也就是添加了自定义排序和设置NumReduceTask()
要想实现使自己的数据充当key值进行排序,必须实现WritableComparable接口的方法。
需求:
将单词计数后的结果按照数量升序排序
源代码:
WordCountBean.java
public class WordCountBean implements WritableComparable
private String word;
private int count;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return word + " " + count;
}
/\*\*
\* 比较器默认大于或者小于时返回一个非零的数,如果等于则返回0,代表两个对象一模一样
\* MR程序比较大小的时候,千万不能返回0.如果返回0,代表两个对象一样,那么相当于认为key是一样的,此时其余的相等数据将显示不出来
\* @param o
\* @return
\*/
@Override
public int compareTo(WordCountBean o) {
if (this.count > o.getCount()) {
return 1;
} else {
return -1;
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.word);
dataOutput.writeInt(this.count);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.word = dataInput.readUTF();
this.count = dataInput.readInt();
}
}
运行截图
部分排序只是相对于全排序多了自定义分区以及设置ReduceTask的数量
需求
根据手机号码前三位进行分区,前三位为"135",则位于第一分区;前三位为"136",则位于第二分区;前三位为"137",则位于第三分区,其余数据在第四分区,并且按照总流量数升序排列
源代码
FlowBean.java
public class FlowBean implements WritableComparable
private String phone;
private int upFlow;
private int downFlow;
private int sumFlow;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return phone + " " + upFlow + " " + downFlow + " " + sumFlow;
}
/\*\*
\* 序列化方法:将Java对象的属性值怎么序列化写出
\* @param dataOutput
\* @throws IOException
\*/
@Override
public void write(DataOutput dataOutput) throws IOException {
// 将一个String类型的属性序列化写出成二进制数据
dataOutput.writeUTF(this.phone);
// 将一个String类型的属性序列化写出成二进制数据
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(sumFlow);
}
/\*\*
\* 反序列化方法:怎么将二进制代码转成JavaBean对象属性的值
\* 反序列化的时候,读取二进制数据时,不能随便读
\* 序列化写出时先写出哪个属性的值,就先读哪个属性值
\* @param dataInput
\* @throws IOException
\*/
@Override
public void readFields(DataInput dataInput) throws IOException {
this.phone = dataInput.readUTF();
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.sumFlow = dataInput.readInt();
}
/\*\*
\* 传进来一个对象,我们需要在这个方法中定义我和传进来的这个对象谁大谁小的逻辑
\* 一般情况下,JavaBean对象去比较大小都是比较属性值的大小
\* 如果返回的是1,那么代表大于传进来的对象,升序排序
\* 如果返回的是0.那么代表相等
\* 返回的是-1,代表小于传进来的对象,降序排序
\*
\* @param o
\* @return
\*/
@Override
public int compareTo(FlowBean o) {
return Integer.compare(this.sumFlow, o.getSumFlow());
}
}
FlowPartition.java
public class FlowPartition extends Partitioner
@Override
public int getPartition(FlowBean flowBean, NullWritable nullWritable, int i) {
String phone = flowBean.getPhone();
String three = phone.substring(0, 3);
switch (three) {
case "135":
return 0;
case "136":
return 1;
case "137":
return 2;
default:
return 3;
}
}
}
3. 运行截图
需求
学生成绩排序:先根据语文成绩降序排序,如果语文成绩相同,再根据英语成绩降序排序
源代码
RecordBean.java
public class RecordBean implements WritableComparable
private String name;
private int china;
private int maths;
private int english;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getChina() {
return china;
}
public void setChina(int china) {
this.china = china;
}
public int getMaths() {
return maths;
}
public void setMaths(int maths) {
this.maths = maths;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
@Override
public String toString() {
return "name='" + name + '\\'' +
", china=" + china +
", maths=" + maths +
", english=" + english;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.name);
dataOutput.writeInt(china);
dataOutput.writeInt(maths);
dataOutput.writeInt(english);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.china = dataInput.readInt();
this.maths = dataInput.readInt();
this.english = dataInput.readInt();
}
@Override
public int compareTo(RecordBean o) {
if (this.china > o.getChina()) {
return -1;
}else if(this.china == o.getChina()) {
if (this.english > o.getEnglish()) {
return -1;
} else {
return 1;
}
} else {
return 1;
}
}
}
运行截图
Combiner合并其实是MapReduce程序中的一个调优策略。
如果多个 MapTask对应一个ReduceTask任务时,ReduceTask的压力会很大,这时可以将ReduceTask的数量增加,这样每个ReduceTask只需要处理一个MapTask,压力会小很多。
还有一种方式就是在Mapper阶段设置一个Combiner合并操作,每一个Combiner对应一个MapTask任务,它会将MapTask任务先进行一次合并,之后再交由ReduceTask执行。
Combiner是可选操作,他只是MR程序中的一个调优策略。但是分区和排序是必选的
Combiner处于Map阶段执行之后,Reduce阶段执行之前,其中Combiner的输入是Mapper阶段的输出Key-value,输出时Reduce阶段输入的key-value
自定义一个combiner继承Reducer,重写reduce方法
public class WordCountCombiner extends Reducer
@Override
protected void reduce(Text key, Iterable
Iterator
long num = 0L;
while (iterator.hasNext()) {
LongWritable next = iterator.next();
long l = next.get();
num += l;
}
context.write(key, new LongWritable(num));
}
}
在job驱动类中设置:
job.**setCombinerClass**_(_WordCountCombiner.class_)_;
将reduce当作Combiner组件使用
通过上述代码可以发现,combiner类写的逻辑方法与Reduce阶段的相同(一般情况下两个的处理逻辑都一样),所以可以直接在Driver中设置Combiner类为Reduce类。注意:虽然二者代码一样,但是执行的逻辑不一样。
在job驱动类中设置
job.setCombinerClass(WordCountReduce.class);
这个分组排序又叫辅助排序,也是可选的。但是在有些业务条件下,分组排序必须存在。
主要作用:如果我们定义的key是JavaBean对象,我们可以在分组中将部分字段相同的bean对象当做同一个key处理。
map阶段数据分区排序合并结束,reduce已经将map阶段的输出数据读取到内存中,即将执行Reducer方法之前
* Text类型判断的就是值,就是相等的key;
LongWritable类型也是判断值相等,就是相等的key;
对于自定义的JavaBean对象,如何判断相等呢?是判断地址相等吗?
【补充】
例如:Student s1 = new Student("zs", "8374772648327498");
Student s2 = new Student("ls", "8374772648327498");
这两个对象相等吗?
从代码角度而言,这两个对象不相等,因为他们的地址不相等
但是Java是面向对象编程的语言,面向对象就是拿现实的逻辑去理解代码,所以从现实的角度来看,这两个对象是同一个对象,因为他们的身份证号一样
所以需要主要一个问题,在Java中,比较基本数据类型是否相等,应该用 == 去判断;但是如果需要比较两个引用数据是否相等,需要通过两个对象的属性值是否相等来判断两个对象是否为同一个对象,一般使用equals方法和compare方法
有了以上基础,再看上面的问题。对于JavaBean对象是否相等的判断,不应该用地址判断,我们需要判断对象中的某些属性是否相等。所以在MR程序中,如果你想根据我们自己的逻辑将不同的JavaBean对象当做同一个key处理,我们可以通过分组辅助排序实现。当然我们的WritableComparable也可以去实现,但是在业务逻辑上这个接口主要是做排序使用的。如果想在Reducer阶段让不同的Java对象成为一组相同的key,那么你就需要定义一个分组排序规则,在里面定义一下哪个对象是相同的key
案例:订单排序
需求
有如下订单数据
订单id
商品id
成交金额
0000001
Pdt_01
222.8
0000001
Pdt_05
25.8
0000002
Pdt_03
522.8
0000002
Pdt_04
122.4
0000002
Pdt_05
722.4
0000003
Pdt_01
222.8
0000003
Pdt_02
33.8
现在需要求出每一个订单中最贵的商品。
思路
【注意】这里的参数都是WritableComparable类型,实际上是Mapper阶段输出的key的类型。这里可以使用多态的方式对参数进行强转
job.**setGroupingComparatorClass**_(_OrderGroup.class_)_;
源代码
public class OrderGroupComparable extends WritableComparator {
public OrderGroupComparable() {
super(Order.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Order order1 = (Order) a;
Order order2 = (Order) b;
if (order1.getId() > order2.getId()) {
return -1;
} else if (order1.getId() == order2.getId()) {
return 0;
} else {
return -1;
}
}
}
运行截图
手机扫一扫
移动阅读更方便
你可能感兴趣的文章