基于文件系统(及MySQL)使用Java实现MapReduce
阅读原文时间:2023年07月15日阅读:1

实现这个代码的原因是:

  • 我会MapReduce,但是之前都是在AWS EMR上,自己搭过伪分布式的,但是感觉运维起来比较困难;
  • 我就MySQL会一点(本来想用mongoDB的但是不太会啊)
  • 数据量不是很大,至少对我来说。
  • 希望不要出很么问题,这方面文件系统还是可以信任的。

设计思路如下:

  • init阶段:将所需的文件添加到一个列表文件input_file_list.txt中。

  • Map阶段:读取input_file_list.txt中的每一个文件的每一行,并将其映射成一个key-value对。

    考虑到key可能包含特殊字符,所以这里使用MySQL存储一个id到key的对应关系的数据。

  • Reduce阶段:针对每一个key,读取对应的文件,最终生成一个name-value列表,该name-value列表对应一个json对象,如:{ "name": "zifeiy", "age": 88 },将所有的json对象存储到一个结果文件reduceResult.txt中。

  • 处理结果阶段,将reduceResult.txt文件进行解析,最终生成结果的CSV文件或者Excel文件。

主要代码:

package com.zifeiy.snowflake.tools.mapreduce.v1;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.gson.Gson;
import com.zifeiy.snowflake.assist.CsvOneLineParser;
import com.zifeiy.snowflake.assist.FileHelper;

import jxl.Workbook;
import jxl.write.Label;
import jxl.write.WritableSheet;
import jxl.write.WritableWorkbook;

public abstract class MapReduceBaseVersion1 {

    private static final String APPENDED_DB_INFO = "?useUnicode=true&characterEncoding=UTF8"
                                                    + "&rewriteBatchedStatements=true"
                                                    + "&useLegacyDatetimeCode=false"
                                                    + "&serverTimezone=Asia/Shanghai"
                                                    + "&useSSL=false";
    private static final String classname       = "com.mysql.cj.jdbc.Driver";
    private static final String url             = "jdbc:mysql://localhost:3306/snowflake" + APPENDED_DB_INFO;
    private static final String username            = "root";
    private static final String password    = "password";

    public static final String taskRootPath = "D:\\snowflake\\task";

    private Connection connection = null;
    private File inputListFile = null;
    private File reduceResultFile = null;
    private File resultFile = null;
    private int taskId;

    public void addInputPath(File file) throws IOException {
        FileHelper.appendFile(inputListFile, file.getAbsolutePath() + "\r\n");
    }

    public void setKeyValuePair(String key, String value) throws Exception {
        int id = -1;
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery(String.format("select id from tmp" + taskId + " where kname='%s'", key.replaceAll("'", "''")));
        if (resultSet.next()) {
            id = resultSet.getInt(1);
        }
        else {
            statement.execute(String.format("insert into tmp" + taskId + " (kname) values ('%s')", key.replaceAll("'", key.replaceAll("'", "''"))));
            resultSet = statement.executeQuery(String.format("select id from tmp" + taskId + " where kname='%s'", key.replaceAll("'", "''")));
            if (resultSet.next()) {
                id = resultSet.getInt(1);
            }
        }
        if (id == -1) throw new Exception("set key value pair failed: key = " + key + ", value = " + value);
        File tmpFile = new File(taskRootPath + File.separator + taskId + File.separator + "tmp" + File.separator + id + ".txt");
        if (tmpFile.exists() == false) {
            tmpFile.createNewFile();
        }
        FileHelper.appendFile(tmpFile, value + "\r\n");
    }

    public void addParamList(List<Map<String, String>> paramList) throws Exception {
        String content = "";
        Gson gson = new Gson();
        for (Map<String, String> params : paramList) {
            String jsonString = gson.toJson(params);
            content += jsonString + "\r\n";
        }
        FileHelper.appendFile(reduceResultFile, content);
    }

    public void generateFile(String[] columns, String[] nameColumns) throws Exception {
        if (reduceResultFile == null || reduceResultFile.exists() == false) {
            throw new Exception("[mapreduce.v1] in generateFile function: reduceResultFile do not exist!");
        }
//        if (false) {    // test
        if (reduceResultFile.length() > 1 * 1024 * 1024) {  // 如果文件大小超过1MB,导出成csv
            resultFile = new File(taskRootPath + File.separator + taskId + File.separator + "result.csv");

            Gson gson = new Gson();

            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(reduceResultFile), "UTF-8"));
            FileOutputStream fos = new FileOutputStream(resultFile);
            OutputStreamWriter osw = new OutputStreamWriter(fos, "UTF-8");

            String content = "";
            for (int i = 0; i < nameColumns.length; i ++) {
                if (i > 0)
                    content += ",";
                content += '"' + nameColumns[i] + '"';
            }
            osw.write(content + "\r\n");

            String line = null;
            while ((line = br.readLine()) != null) {
                content = "";
                Map<String, String> map = gson.fromJson(line, Map.class);
                if (map == null) { throw new Exception("map is null by parsing line: " + line); }
                for (int i = 0; i < columns.length; i ++) {
                    if (i > 0) content += ",";
                    String c = columns[i];
                    String v = map.get(c);
                    if (v != null) {
                        content += '"' + v + '"';
                    }
                }
                osw.write(content + "\r\n");
            }
            br.close();
            osw.write(content);
            osw.flush();
            osw.close();
        } else {    // 如果文件大小小于1MB,导出成Excel文件
            resultFile = new File(taskRootPath + File.separator + taskId + File.separator + "result.xls");

            WritableWorkbook workbook = Workbook.createWorkbook(resultFile);
            WritableSheet sheet = workbook.createSheet("sheet1", 0);

            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(reduceResultFile), "UTF-8"));
            String line = null;

            for (int i = 0; i < nameColumns.length; i ++) {
                sheet.addCell(new Label(i, 0, nameColumns[i]));
            }

            int rowId = 1;
            while ((line = br.readLine()) != null) {
                Gson gson = new Gson();
                List<String> rowList = new ArrayList<String>();

                Map<String, String> map = gson.fromJson(line, Map.class);
                if (map == null) { throw new Exception("map is null by parsing line: " + line); }
                for (int i = 0; i < columns.length; i ++) {
                    String c = columns[i];
                    String v = map.get(c);
                    String innerContent = "";
                    if (v != null) {
                        innerContent = v;
                    }
                    sheet.addCell(new Label(i, rowId, innerContent));
                }
                rowId ++;

            }
            br.close();

            workbook.write();
            workbook.close();

        }
    }

    public abstract void init() throws Exception;

    public abstract void map(String line) throws Exception;

    public abstract void reduce(String key, ReduceReader reduceReader) throws Exception;

    public abstract void generate() throws Exception;

    public String mapreduce() {
        try {
            Class.forName(classname);
            connection = DriverManager.getConnection(url, username, password);

            // generate taskId
            PreparedStatement preparedStatement = connection.prepareStatement("insert into task () values ()");
            preparedStatement.execute("insert into task () values ()", PreparedStatement.RETURN_GENERATED_KEYS);
            ResultSet resultSet = preparedStatement.getGeneratedKeys();
            if (resultSet.next()) {
                taskId = resultSet.getInt(1);
            }
            else {
                throw new Exception("[mapreduce.v1] Exception: can not generate taskId");
            }
            // generated task file path
            String taskPath = taskRootPath + File.separator + taskId;
            File taskPathDir = new File(taskPath);
            if (taskPathDir.exists() == true) {
                throw new Exception("[mapreduce.v1] Exception: task directory already exists");
            }
            taskPathDir.mkdirs();
            String tmpDirPath = taskPath + File.separator + "tmp";
            File tmpDir = new File(tmpDirPath);
            tmpDir.mkdirs();
            this.inputListFile = new File(taskPath + File.separator + "input_file_list.txt");
            inputListFile.createNewFile();
            // period. 1: init()
            // during init period, we will use addInputPath function to add all the input files we need
            init();

            // begin to read each line of each file
            // peroid. 2: map(line)

            // db prepare
            Statement statement = connection.createStatement();
            statement.execute("create temporary table tmp" + taskId + " ( id int not null auto_increment primary key, kname varchar(200) )");

            // file content prepare

            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(inputListFile), "UTF-8"));
            String inputFilename = null;
            while ((inputFilename = br.readLine()) != null) {
                File inputFile = new File(inputFilename);
                if (inputFile.exists() == false) {
                    throw new Exception("[mapreduce.v1] Exception: input file " + inputFilename + " do not exists!");
                }
                BufferedReader br2 = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile), "GBK"));
                String line = null;
                while ((line = br2.readLine()) != null) {
                    map(line);
                }
            }
            br.close();

            // period. 3: reduce(key, valueList)
            reduceResultFile = new File(taskPath + File.separator + "reduce.txt");
            if (reduceResultFile.exists() == true) {
                throw new Exception("[mapreduce.v1] reduce file already exists!");
            }
            reduceResultFile.createNewFile();

            resultSet = statement.executeQuery("select * from tmp" + taskId);
            while (resultSet.next()) {
                int id = resultSet.getInt(1);
                String key = resultSet.getString(2);
                File reduceFile = new File(tmpDirPath + File.separator + id + ".txt");
                if (reduceFile.exists() == false) {
                    throw new Exception("[mapreduce.v1] Exception: reduce file " + reduceFile.getName() + " not exists!");
                }
                ReduceReader reduceReader = new ReduceReader(reduceFile);
                reduce(key, reduceReader);
            }

            // period. 4: generate
            // generate the result file
            generate();

            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        if (resultFile == null) return null;
        else return resultFile.getAbsolutePath();

    }

    // main for test
    public static void main(String[] args) {
        MapReduceBaseVersion1 mapReduceBaseVersion1 = new MapReduceBaseVersion1() {

            @Override
            public void reduce(String key, ReduceReader reduceReader) throws Exception {
                // TODO Auto-generated method stub
                List<Map<String, String>> paramList = new ArrayList<Map<String,String>>();

                String line;
                while ( (line = reduceReader.next()) != null ) {
                    List<String> rowList = CsvOneLineParser.parseLine(line);
                    Map<String, String> tmpMap = new HashMap<String, String>();
                    int idx = 0;
                    for (String s : rowList) {
                        idx ++;
                        tmpMap.put("" + idx, s);
                    }
                    paramList.add(tmpMap);
                }
                addParamList(paramList);
            }

            @Override
            public void map(String line) throws Exception {
                // TODO Auto-generated method stub
                setKeyValuePair(line.substring(1, 3), line);
            }

            @Override
            public void init() throws Exception {
                // TODO Auto-generated method stub
                addInputPath(new File("D:\\test\\test.del"));
            }

            @Override
            public void generate() throws Exception {
                // TODO Auto-generated method stub
                generateFile(new String[] { "1", "2", "3", "4", "5", "6" }, new String[] { "一", "二", "三", "四", "五", "六" });
            }
        };
        System.out.println(mapReduceBaseVersion1.mapreduce());
    }
}