Hadoop TextInputFormat
阅读原文时间:2023年07月16日阅读:1

1. TextInputFortmat

TextInputFormat是默认的InputFormat。每条记录是一行输入。Key是LongWritable类型,存储该行在整个文件中的字节偏移量(不是行数),值是这行的内容,为一个Text对象。

例如输入文件为:

grunt> cat test2

12,e21,ddwq,dqw,dwqw

sfd,cda,cdsz,cdwq,qwe

12,cds,fwa,feacd,cadfa

21ede,cdsf,ca,fa,dcac

caewf,ea,cdadc,acds,acsd

12e,afs,afesd,caefd,cawc

cax,cafe,caefe,fea,ceaef

在使用默认的 Map处理后输出:

grunt> cat out

0   12,e21,ddwq,dqw,dwqw

21  sfd,cda,cdsz,cdwq,qwe

43  12,cds,fwa,feacd,cadfa

66  21ede,cdsf,ca,fa,dcac

88  caewf,ea,cdadc,acds,acsd

113 12e,afs,afesd,caefd,cawc

138  cax,cafe,caefe,fea,ceaef

可以看到Key的值并不是行数,而是字节在文件中的偏移量。一般情况下,很难获取到文件的行号,因为文件是按字节切分为分片,而不是按行切分。

在按行读文本的情况下,可能会存在超长行的情况。超长行会导致内存溢出,可以通过设置 mapreduce.input.linerecordreader.line.maxlength,指定一个最长行的字节数(在内存范围内),可以确保 recordreader 跳过超长行。

2. KeyValueTextInputFormat

TextInputFormat 将文件中的行作为Key,每行对应的文本作为Value。但是对于某些文件内容已经是 Key-Value 形式的话,使用 TextInputFormat 会显得多次一举。在这种情况下,我们可以使用KeyValueTextInputFormat,它以某个分隔符进行分割(默认为制表符):

public KeyValueLineRecordReader(Configuration conf) throws IOException {
    String sepStr = conf.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
    this.separator = (byte)sepStr.charAt(0);
}

一个范例如下,使用逗号为分隔符:

grunt> cat test2

12,e21,ddwq,dqw,dwqw

sfd,cda,cdsz,cdwq,qwe

12,cds,fwa,feacd,cadfa

21ede,cdsf,ca,fa,dcac

输出为:

grunt> cat out

12  cds,fwa,feacd,cadfa

12  e21,ddwq,dqw,dwqw

12e afs,afesd,caefd,cawc

21ede   cdsf,ca,fa,dcac

在任务设置中需要做的配置如下:

Configuration conf = new Configuration();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat.class);

3. NLineInputFormat

在使用TextInputFormat和KeyValueInputFormat 时,每个mapper 收到的行数取决于输入的分片大小以及行的长度。如果希望 mapper 收到固定行的输入,则需要使用 NLineInputFormat。与 TextInputFormat一样,key是文件中的字节偏移量,值是行本身。

N是每个mapper收到的输入行数。N设置为1时(默认),每个mapper正好收到一行输入。同样使用之前的一共7行输入,使用NLineInputFormat:

job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.NLineInputFormat.class);

grunt> cat out

0   12,e21,ddwq,dqw,dwqw

21  sfd,cda,cdsz,cdwq,qwe

43   12,cds,fwa,feacd,cadfa

查看此任务的相关指标,可以看到:

Job Counters

Launched map tasks=7

Launched reduce tasks=1

Other local map tasks=7

Mapper数一共有7个,也就是每行均生成了一个Map。可以通过设置以下参数指定NLine为多少行:

mapreduce.input.lineinputformat.linespermap

References: Hadoop权威指南第四版

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章