DataX二次开发——新增HiveReader插件
阅读原文时间:2023年07月08日阅读:10

DataX官方开源的版本支持HDFS文件的读写,并没有支持基于JDBC的Hive数据读写,很多时候一些数据同步不太方便,比如在读取Hive之前先执行一些sql、读取一些Hive的视图数据、或者在数据同步时执行一段固定的SQL,将SQL执行结果写入下游等各种场景,实际上还是需要Hive插件来支持。而在实际工作中,我们也遇到了类似的一些情况需要二次开发DataX以支持此类场景。本插件已在生产环境稳定运行一年有余,现分享给大家,如有问题也可联系我(qq:1821088755)。

hivereader插件比较简单,共有三个类,两个配置文件。其中:

  • HiveReader:实现DataX框架核心方法,是具体逻辑。
  • HiveReaderErrorCode:继承了DataX框架的ErrorCode类,是用于统一异常处理DataXException类中调用,具体是新增了一个枚举值。
  • HiveConnByKerberos:是在检测到Hive具备Kerberos认证要求时,进行认证的工具类。
  • plugin.json:DataX插件固定的配置文件,用于指定插件的入口类。
  • plugin_job_template.json:二次开发插件,一般需要提供一下具体的使用方式,此json文件即为HiveReader插件的配置方式说明。

2.1 HiveReader类

首先是HiveReader类,需要注意的是一些常量或枚举值,需要自行添加,其中DataBaseType枚举类中,需要新增Hive枚举项并添加Hive的驱动类全路径,具体见注释,另外就是Kerberos认证相关的几个配置,一个是keytab的路径,一个是krb5.conf的路径,另外一个是principle的值。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.rdbms.util.DataBaseType;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.security.authentication.util.KerberosName;

import java.lang.reflect.Field;
import java.util.List;

import static com.alibaba.datax.common.base.Constant.DEFAULT_FETCH_SIZE;//2048,可根据条件自己取值
import static com.alibaba.datax.common.base.Key.FETCH_SIZE; // 参数名:"fetchSize"

@Slf4j
public class HiveReader
extends Reader
{

//此处需现在com.sinosig.plumber.rdbms.util.DataBaseType枚举类中添加Hive类型,内容为:Hive("hive2", "org.apache.hive.jdbc.HiveDriver"),  
private static final DataBaseType DATABASE\_TYPE = DataBaseType.Hive;

public static class Job  
        extends Reader.Job  
{

    private Configuration originalConfig = null;  
    private CommonRdbmsReader.Job commonRdbmsReaderJob;

    @Override  
    public void init()  
    {  
        this.originalConfig = getPluginJobConf();

        Boolean haveKerberos = this.originalConfig.getBool(Key.HAVE\_KERBEROS, false);  
        if (haveKerberos) {  
            log.info("检测到kerberos认证,正在进行认证");  
            org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();  
            String kerberosKeytabFilePath =  this.originalConfig.getString(Key.KERBEROS\_KEYTAB\_FILE\_PATH);  
            String kerberosPrincipal =  this.originalConfig.getString(Key.KERBEROS\_PRINCIPAL);  
            String krb5Path =  this.originalConfig.getString(Key.KRB5\_CONF\_FILE\_PATH);

            hadoopConf.set("hadoop.security.authentication", "kerberos");  
            hadoopConf.set("hive.security.authentication", "kerberos");  
            hadoopConf.set("hadoop.security.authorization", "true");  
            System.setProperty("java.security.krb5.conf",krb5Path);  
            refreshConfig();  
            HiveConnByKerberos.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf,krb5Path);  
        }  
        this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE\_TYPE);  
        this.originalConfig = commonRdbmsReaderJob.init(originalConfig);  
    }

    @Override  
    public void preCheck()  
    {  
        this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE\_TYPE);  
    }

    @Override  
    public List<Configuration> split(int adviceNumber)  
    {  
        return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber);  
    }

    @Override  
    public void post()  
    {  
        this.commonRdbmsReaderJob.post(originalConfig);  
    }

    @Override  
    public void destroy()  
    {  
        this.commonRdbmsReaderJob.destroy(originalConfig);  
    }

}

public static class Task  
        extends Reader.Task  
{

    private Configuration readerSliceConfig;  
    private CommonRdbmsReader.Task commonRdbmsReaderTask;

    @Override  
    public void init()  
    {  
        this.readerSliceConfig = getPluginJobConf();  
        this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE\_TYPE, getTaskGroupId(), getTaskId());  
        this.commonRdbmsReaderTask.init(this.readerSliceConfig);  
    }

    @Override  
    public void startRead(RecordSender recordSender)  
    {  
        int fetchSize = this.readerSliceConfig.getInt(FETCH\_SIZE, DEFAULT\_FETCH\_SIZE);

        this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);  
    }

    @Override  
    public void post()  
    {  
        this.commonRdbmsReaderTask.post(readerSliceConfig);  
    }

    @Override  
    public void destroy()  
    {  
        this.commonRdbmsReaderTask.destroy(readerSliceConfig);  
    }  
}  
/\*\* 刷新krb内容信息 \*/  
public static void refreshConfig() {  
    try {  
        sun.security.krb5.Config.refresh();  
        Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");  
        defaultRealmField.setAccessible(true);  
        defaultRealmField.set(  
                null,  
                org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());  
        // reload java.security.auth.login.config  
        javax.security.auth.login.Configuration.setConfiguration(null);  
    } catch (Exception e) {  
        log.warn(  
                "resetting default realm failed, current default realm will still be used.", e);  
    }  
}  

}

2.2 HiveConnByKerberos类

HiveConnByKerberos类比较简单,是一个通用的Kerberos认证的接口。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.exception.PlumberException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;

@Slf4j
public class HiveConnByKerberos {
public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf) {
System.setProperty("java.security.krb5.conf",krb5conf);
if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
}
catch (Exception e) {

            log.error("kerberos认证失败");  
            String message = String.format("kerberos认证失败,请检查 " +  
                            "kerberosKeytabFilePath\[%s\] 和 kerberosPrincipal\[%s\]",  
                    kerberosKeytabFilePath, kerberosPrincipal);  
            e.printStackTrace();  
            throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS\_LOGIN\_ERROR, message, e);  
        }  
    }  
}  

}

2.3 HiveReaderErrorCode类

HiveReaderErrorCode类,主要就是集成ErrorCode类,并添加一个枚举项,这块可直接在ErrorCode类添加,也可使用此类,为固定写法。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.spi.ErrorCode;

public enum HiveReaderErrorCode
implements ErrorCode
{
KERBEROS_LOGIN_ERROR("HiveReader-13", "KERBEROS认证失败");

private final String code;  
private final String description;

HiveReaderErrorCode(String code, String description)  
{  
    this.code = code;  
    this.description = description;  
}

@Override  
public String getCode()  
{  
    return this.code;  
}

@Override  
public String getDescription()  
{  
    return this.description;  
}

@Override  
public String toString()  
{  
    return String.format("Code:\[%s\], Description:\[%s\]. ", this.code, this.description);  
}  

}

2.4 plugin.json文件

{
"name": "hivereader",
"class": "com.alibaba.datax.plugin.reader.hivereader.HiveReader",
"description": "Retrieve data from Hive via jdbc",
"developer": "wxm"
}

2.5 plugin_job_template.json文件

这块需要注意的一个问题是,如果Kerberos认证的Hive连接URL有两种方式,如果是基于zookeeper的方式,则需保证运行DataX服务的节点与zookeeper节点网络是打通的,并且一定不要忘记写上具体的Hive库名。

{
"name": "hivereader",
"parameter": {
"column": [
"*"
],
"username": "hive",
"password": "",
"preSql":"show databases;",
"connection": [
{
"jdbcUrl": [
"jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM"
],
"table": [
"hive_reader"
]
}
],
"where": "logdate='20211013'" ,
"haveKerberos": true,
"kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab",
"kerberosPrincipal": "hive@EXAMPLE.COM"
}
}