Hadoop编程——Java编写MapReduce:WordCount案例
阅读原文时间:2023年07月09日阅读:5

一、MapReduce简介

MapReduce是一种面向大数据平台的分布式并行计算框架,它允许使用人员在不会分布式并行编程的情况下,将程序运行在分布式系统上。它提供的并行计算框架,能自动完成计算任务的并行处理,自动划分计算数据,在集群节点上自动分配和执行计算任务,自动收集计算结果,使得开发人员只用关心业务的实现逻辑,大大降低开发负担。

二、MapReduce编程

1、MapReduce模型

在编写MapReduce程序时,一般由三部分构成,分别是Map、Reduce和Dirver,其中Map和Reduce部分负责业务逻辑的实现,Driver部分为驱动类,负责调用任务,执行MapReduce程序。在编写MapReduce程序之前,需要先创建一个Maven工程,创建的方法为:https://www.cnblogs.com/ynqwer/p/14540108.html,为了方便管理,可以在该目录下面创建一个包,然后在这个包下写代码来完成MapRecude程序。在这个包下面创建三个类,分别实现Map阶段、Reduce阶段和Driver三部分的程序。

2、Map阶段实现

Map函数默认按行从HDFS读取数据进行处理,即从HDFS一行一行的将数据读取过来,读取过来的格式为<行号,行内容>,然后按一定的分隔符切割,最后按key-value的格式输出,即MapReduce的默认输入为一对Key-value对,输出也是一对key-value对。在编写代码时,需要继承MapReduce的Mapper类,并重写Map方法,代码如下:

package com.qwer.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * WordCount案例Map阶段代码
 * KEYIN, Map阶段输入K的类型:LongWritable
 * VALUEIN, Map阶段输入V的类型:Text
 * KEYOUT, Map阶段输出K的类型:Text
 * VALUEOUT,Map阶段输出K的类型:IntWritable
 */
// Map阶段继承Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    // 定义输出kv对的数据类型
    Text outK = new Text();
    IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        // 将输入的数据转换成String类型
        String line = value.toString();
        // 将数据按空格切分
        String[] words = line.split(" ");
        for (String word : words) {
            outK.set(word);
            context.write(outK, outV);
        }
    }

}
3、Reduce阶段实现

Reduce阶段的输入数据类型和Map阶段的输出数据类型是一样的,再拿到输入数据之后,就可以进行业务逻辑的编写了,比如WordCount案例需要统计单词数量,然后按照key-value的格式输出。Recude阶段需要继承MapRecude的Reducer类,并重写Reduce方法,代码如下:

package com.qwer.mapreduce.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * WordCount案例Reduce阶段代码
 * KEYIN, Reduce阶段输入K的类型:Text
 * VALUEIN, Reduce阶段输入V的类型:IntWritable
 * KEYOUT, Reduce阶段输出K的类型:Text
 * VALUEOUT,Reduce阶段输出K的类型:IntWritable
 */
// Reduce阶段继承Reducer类
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    // 定义Reduce阶段输出值的类型
    IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        // 统计单词的个数
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key, outV);
    }

}
4、Driver类实现

Driver部分为驱动类,负责调用任务,执行MapReduce程序。其程序大概可以分为八个步骤:

  1. 获取job
  2. 设置jar包路径
  3. 关联Mapper和Reducer
  4. 设置Map输出的KV类型
  5. 设置最终输出的KV类型
  6. 设置输入路径
  7. 设置输出路径
  8. 提交任务

根据上面步骤,写出代码如下:

package com.qwer.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * WordCount案例Driver代码
 * 1、获取job
 * 2、设置jar包路径
 * 3、关联Mapper和Reducer
 * 4、设置Map输出kv类型
 * 5、设置最终输出kv类型
 * 6、设置输入路径
 * 7、设置输出路径
 * 8、提交任务
 */
public class WordCountDriver {
    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

        // 1、获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2、设置jar包路径
        job.setJarByClass(WordCountDriver.class);

        // 3、关联Mapper和Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4、设置Map输出kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5、设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6、设置输入地址
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        // 7、设置输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 8、提交任务
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

三、打包并执行

三个类都写好之后,就可以打成jar包放到集群中执行了。在eclipse的左侧工程目录里面,鼠标右键点击工程名,然后选择Run AS->Maven clean,完成之后再次鼠标右键点击工程名,然后选择Run AS->Maven install,完成之后会在工程目录中的target目录下出现一个jar包

将jar包放到集群上就可以执行了。在HDFS中创建一个目录/input,往该目录中随便上传一个文件,然后在jar包所在的目录下执行

hadoop jar MapReduceDemo-0.0.1-SNAPSHOT.jar com.qwer.mapreduce.wordcount.WordCountDriver /input /output

(注意:上面的com.qwer.mapreduce.wordcount.WordCountDriver为Driver类的全路径,/output为输出目录,如果在执行程序的时候该目录已经存在,程序会报错)

执行完毕后,会自动在HDFS中创建一个/output目录,该目录下为程序执行的结果。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章