MapReduce08 数据清洗(ETL)和压缩
阅读原文时间:2023年07月09日阅读:2

目录

数据清洗(ETL)

ETL(Extract抽取-Transform转换-Load加载)用来描述数据从来源端经过抽取、转换、加载至目的端的过程。一般用于数据仓库,但其对象并不限于数据仓库

在运行核心业务MapReduce程序之前,往往需要对数据进行清洗,清理掉不符合用户要求的数据,清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

需求

去除日志中字段(通过空格切割)个数小于等于11的日志

输入数据D:\hadoop\hadoop_data\inputlog

期望输出数据:每行字段长度都大于11

需求分析

需要在Map阶段对输入的数据根据规则进行过滤清洗。

实现代码

编写WebLogMapper类

package com.ranan.mapreduce.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author ranan
 * @create 2021-09-03 10:39
 */
class WebLogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.获取一行
        String line = value.toString();
        //2.ETL 符合条件就写出到上下文,不符合条件就直接判断下一行
        boolean result = parseLog(line,context);
        if (!result){
            return;
        }

        //3.写出
        context.write(value,NullWritable.get());

    }

    private boolean parseLog(String line, Context context) {
        String[] fields = line.split(" ");
        if(fields.length >11){
            return true;
        }
        return false;
    }
}

编写WebLogDriver类

package com.ranan.mapreduce.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author ranan
 * @create 2021-09-03 10:47
 */
public class WebLogDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WebLogDriver.class);
        job.setMapperClass(WebLogMapper.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        //通过命令行控制,方便上次打包到集群运行
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

打包到集群运行

如何打包到集群运行

压缩

压缩的优点:以减少磁盘IO、减少磁盘存储空间。

压缩的缺点:增加CPU开销。

压缩原则

1.运算密集型的Job,少用压缩

2.IO密集型的Job,多用压缩

压缩算法对比

压缩性能比较

压缩方式选择

压缩方式选择器时需要考虑:压缩/解压缩速度、压缩后的大小、压缩后是否可以支持切片。

类型

优点

缺点

Gzip

压缩率比较高

不支持切片;压缩/解压速度一般

Bzip2

压缩率高;支持切片

压缩/解压速度慢

Lzo

压缩/解压速度比较块;支持切片

压缩率一般;支持切片需要额外创建索引

Snappy

压缩/解压速度块

不支持切片;压缩率一般

压缩位置选择

压缩可以再MapReduce作用的任意阶段启用。

压缩参数配置

1.为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器。

2.要在Hadoop中启用压缩,可以配置如下参数

参数

默认值

阶段

建议

io.compression.codecs(在core-site.xml中配置)

无,这个需要在命令行输入hadoopchecknative查看

输入压缩

Hadoop使用文件扩展名判断是否支持某种编解码器

mapreduce.map.output.compress(在mapred-site.xml中配置)

false

mapper输出

这个参数设为true启用压缩

mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

mapper输出

企业多使用LZO或Snappy编解码器在此阶段压缩数据

mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)

false

reducer输出

这个参数设为true启用压缩

mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

reducer输出

使用标准工具或者编解码器,如gzip和bzip2

hadoop checknative 查看默认支持的压缩方式

注意:snappy和hadoop的版本需要配对才能适用。

Map输出端采用压缩

即使MapReduce的输入输出文件都是未压缩的文件,仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。

Driver类

这里用WordCount案例,其余部分保持不变,只修改Driver类。

Map输出压缩文件,Reduce端解压,最终输出的文件格式不变。

//本机Hadoop版本支持的压缩格式有BZip2Codec、DefaultCodec
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();

    //开启map端输出压缩
    conf.setBoolean("mapreduce.map.output.compress", true);
    //设置map端输出压缩方式
    conf.setClass("mapreduce.map.output.compress.codec",BZip2Codec.class,CompressionCodec.class);

    Job job = Job.getInstance(conf);
    job.setJarByClass(WordCountDriver.class);
    job.setMapperClass(WoradCountMapper.class);job.setReducerClass(WordCountReducer.class);
    job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
    boolean result = job.waitForCompletion(true);
    System.exit(result ? 0 : 1);
}
}

Reduce输出端采用压缩

Driver类

//设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
//设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
//FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);