nutch2.3.1源码分析——InjectorJob
阅读原文时间:2023年07月12日阅读:2

InjectorJob实现的功能是:从种子站点文件当中读取站点信息并且将这些站点的个数、url(url以 域名:协议/端口号/路径名 设为形式存储在数据库当中,为了提高读写速度)回写到Context类的实例context当中。

InjectorJob类的运行流程如下:

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(),
        args);
    System.exit(res);
  }
}

主函数,提供函数的入口,主要功能是创建一个ToolRunner类,先去加载Nutch的配置文件,配置文件默认情况下加载nutch-default.xml和nutch-site.xml两个文件,接收命令行输入的参数args并创建一个InjectorJob类运行。

接下来,程序开始检查输入的参数是否合法等一系列操作:

public int run(String[] args) throws Exception {
    if (args.length < 1) {
      System.err.println("Usage: InjectorJob <url_dir> [-crawlId <id>]");
      return -1;
    }
    for (int i = 1; i < args.length; i++) {
      if ("-crawlId".equals(args[i])) {
        getConf().set(Nutch.CRAWL_ID_KEY, args[i + 1]);//??什么功能?
        i++;
      } else {
        System.err.println("Unrecognized arg " + args[i]);
        return -1;
      }
    }

    try {
      inject(new Path(args[0]));
      return -0;
    } catch (Exception e) {
      LOG.error("InjectorJob: " + StringUtils.stringifyException(e));
      return -1;
    }
  }

如果没有输入参数,程序将提醒输入参数的正确方法,如果参数输入成功,则跳转到inject(new Path(args[0]))函数进行下一步的操作。

 public void inject(Path urlDir) throws Exception {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    LOG.info("InjectorJob: starting at " + sdf.format(start));
    LOG.info("InjectorJob: Injecting urlDir: " + urlDir);
    run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));
    long end = System.currentTimeMillis();
    LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
        + TimingUtil.elapsedTime(start, end));
  }

该函数在打印了基本的日志信息之后跳转到run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));函数执行inject最核心的功能。

public Map<String, Object> run(Map<String, Object> args) throws Exception {
    getConf().setLong("injector.current.time", System.currentTimeMillis());         //setLong(String name,long value). set the value of the name property to a long即将name的属性设置成为Long型的。
    Path input;
    Object path = args.get(Nutch.ARG_SEEDDIR);
    if (path instanceof Path) {
      input = (Path) path;
    } else {
      input = new Path(path.toString());
    }
    numJobs = 1;
    currentJobNum = 0;
    currentJob = NutchJob.getInstance(getConf(), "inject " + input);
    FileInputFormat.addInputPath(currentJob, input);//add a path to the list of inputs for the map-reduce job(addInputPath函数的作用)
    /**
     * public void Job.setMapperClass(CLass<? extends Mapper> cls) throws IllegalStateException.
     * 作用:set the Mapper for the job
     */
    currentJob.setMapperClass(UrlMapper.class);

    //set the key class for the map output data.This allows the user to specify the map output key class to be different than the final output value。为map流程的输出键值对设置相应的类型
    currentJob.setMapOutputKeyClass(String.class);

    //set the value class for the map output data
    currentJob.setMapOutputValueClass(WebPage.class);

    //为该Job设置输出格式,采用Gora格式进行存储
    currentJob.setOutputFormatClass(GoraOutputFormat.class);

    DataStore<String, WebPage> store = StorageUtils.createWebStore(
        currentJob.getConfiguration(), String.class, WebPage.class);
    GoraOutputFormat.setOutput(currentJob, store, true);

    // NUTCH-1471 Make explicit which datastore class we use
    Class<? extends DataStore<Object, Persistent>> dataStoreClass = StorageUtils
        .getDataStoreClass(currentJob.getConfiguration());
    LOG.info("InjectorJob: Using " + dataStoreClass
        + " as the Gora storage class.");

    //set reducer for the job
    currentJob.setReducerClass(Reducer.class);

    //set the number of reduce tasks
    currentJob.setNumReduceTasks(0);

    currentJob.waitForCompletion(true);//通过调试发现,执行这一句的时候调用了内部类UrlMapper类的map函数
    ToolUtil.recordJobStatus(null, currentJob, results);

    // NUTCH-1370 Make explicit #URLs injected @runtime
    long urlsInjected = currentJob.getCounters()
        .findCounter("injector", "urls_injected").getValue();
    long urlsFiltered = currentJob.getCounters()
        .findCounter("injector", "urls_filtered").getValue();
    LOG.info("InjectorJob: total number of urls rejected by filters: "
        + urlsFiltered);
    LOG.info("InjectorJob: total number of urls injected after normalization and filtering: "
        + urlsInjected);

    return results;
  }

在执行currentJob.waitForCompletion(true);这条语句时程序调用UrlMapper内部类执行setup和map函数。

UrlMapper类实现对网页的一些基本信息的控制,包括url标准化urlNormalizers,fetch的时间间隔,网页的注入分数,网页过滤filters,分数赋值scfilters,当前时间等。

其中setup(Context context)函数用来对该类的基本数据成员进行赋值,相当于是该类的构造函数;

map(LongWritable key,Text value,Context context)函数主要有以下功能

1.获取value当中的url,以一行为一个url,若其长度为0或者以“#”开头,则直接返回;

2.将url中的metaname和metavalue值以Map的形式存储在matadata当中,metaname包括两种形式即nutchScoreMDName和nutchFetchIntervalMDName;

3.标准化和过滤url,并给这些新注入的url赋予一定的初始分数,在赋予初始分数的过程当中,调用了org.apache.nutch.scoring包中的ScoringFilters类,这个类为了注入分数又调用了ScoringFilter接口,最后又根据用户想要使用那种方式去注入分数调用opic或tld等插件。如果想更改分数注入方式,则可以通过修改conf文件夹下面的nutch-default.xml文件中的plugin.includes的value值来实现;
4.记录注入网页的本次fetch的时间和其正常的两次fetch之间的时间间隔。

UrlMapper类的源码如下所示:

public static class UrlMapper extends
      Mapper<LongWritable, Text, String, WebPage> { 
    private URLNormalizers urlNormalizers;//url标准化 
    private int interval;//fetch的时间间隔默认30天 
    private float scoreInjected; 
    private URLFilters filters;//过滤url 
    private ScoringFilters scfilters; 
    private long curTime;//当前时间 

    @Override
    protected void setup(Context context) throws IOException,
        InterruptedException { 
      urlNormalizers = new URLNormalizers(context.getConfiguration(), 
          URLNormalizers.SCOPE_INJECT); 
      interval = context.getConfiguration().getInt("db.fetch.interval.default", 
          2592000);
      filters = new URLFilters(context.getConfiguration()); 
      scfilters = new ScoringFilters(context.getConfiguration()); 
      scoreInjected = context.getConfiguration().getFloat("db.score.injected", 
          1.0f);
      curTime = context.getConfiguration().getLong("injector.current.time", 
          System.currentTimeMillis()); 
    } 

    protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException { 
      String url = value.toString().trim(); // value is line of text。String的trim()函数用以返回字符串的副本,忽略前导空白和尾部空白 

      System.out.println("输入的种子站点为:"+url); 

      //若url不为空  且  url的长度不为0或url以“#”号开始,则直接返回??? 
      if (url != null && (url.length() == 0 || url.startsWith("#"))) { 
        /* Ignore line that start with # */ 
        return; 
      } 

      // if tabs : metadata that could be stored 
      // must be name=value and separated by \t 
      float customScore = -1f; 
      int customInterval = interval;
      Map<String, String> metadata = new TreeMap<String, String>();
      if (url.indexOf("\t") != -1) {  
        String[] splits = url.split("\t"); 
        url = splits[0]; 
        for (int s = 1; s < splits.length; s++) { 
          // find separation between name and value 
          int indexEquals = splits[s].indexOf("="); 
          if (indexEquals == -1) { 
            // skip anything without a =System.out.println(filters.getClass().getName()); 
            continue;
          }
          String metaname = splits[s].substring(0, indexEquals); 
          String metavalue = splits[s].substring(indexEquals + 1); 

          //System.out.println("metaname:" + metaname +"  metavalue:"+metavalue); 

          if (metaname.equals(nutchScoreMDName)) { 
            try { 
              customScore = Float.parseFloat(metavalue); 
            } catch (NumberFormatException nfe) { 
            }
          } else if (metaname.equals(nutchFetchIntervalMDName)) { 
            try { 
              customInterval = Integer.parseInt(metavalue); 
            } catch (NumberFormatException nfe) { 
            } 
          } else 
            metadata.put(metaname, metavalue); 
        } 
      } 

      try { 
        url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); 
        url = filters.filter(url); // filter the url 
      } catch (Exception e) { 
        LOG.warn("Skipping " + url + ":" + e);
        url = null; 
      }
      if (url == null) { 
        context.getCounter("injector", "urls_filtered").increment(1);
        return; 
      } else { // if it passes 
        String reversedUrl = TableUtil.reverseUrl(url); // collect it 
        WebPage row = WebPage.newBuilder().build(); 
        row.setFetchTime(curTime); 
        row.setFetchInterval(customInterval); 

        // now add the metadata 
        Iterator<String> keysIter = metadata.keySet().iterator(); 
        while (keysIter.hasNext()) { 
          String keymd = keysIter.next(); 
          String valuemd = metadata.get(keymd); 
          row.getMetadata().put(new Utf8(keymd), 
              ByteBuffer.wrap(valuemd.getBytes())); 
        } 

        //System.out.println("customScore:"+customScore); 

        if (customScore != -1){ 
          //System.out.println("customScore:"+customScore); 
          row.setScore(customScore); 
        } 
        else 
          row.setScore(scoreInjected); 

        //System.out.println("scoreInjected:" + scoreInjected); 

        try { 
          scfilters.injectedScore(url, row); 
          //System.out.println("网页内容为"+row.getContent()+"的分数值是:" + row.getScore()); 
        } catch (ScoringFilterException e) { 
          if (LOG.isWarnEnabled()) { 
            LOG.warn("Cannot filter injected score for url " + url
                + ", using default (" + e.getMessage() + ")"); 
          } 
        } 
        context.getCounter("injector", "urls_injected").increment(1);
        row.getMarkers() 
            .put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0))); 
        Mark.INJECT_MARK.putMark(row, YES_STRING); 
        context.write(reversedUrl, row); 
      } 
    } 
  }

在UrlMapper类的map函数中,传入的参数有一个键值对,key和对应的value,还有一个Context context参数,符合Haddoop的map/reduce工作模式,map函数实现完上述功能之后,将注入的网页数目和处理之后的url回写到context当中。

接下来程序回到public Map

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章