jjzjj

DataX二次开发——新增HiveReader插件

^王晓明^ 2023-04-16 原文

一、研发背景

    DataX官方开源的版本支持HDFS文件的读写,并没有支持基于JDBC的Hive数据读写,很多时候一些数据同步不太方便,比如在读取Hive之前先执行一些sql、读取一些Hive的视图数据、或者在数据同步时执行一段固定的SQL,将SQL执行结果写入下游等各种场景,实际上还是需要Hive插件来支持。而在实际工作中,我们也遇到了类似的一些情况需要二次开发DataX以支持此类场景。本插件已在生产环境稳定运行一年有余,现分享给大家,如有问题也可联系我。

二、HiveReader插件介绍

    hivereader插件比较简单,共有三个类,两个配置文件。其中:

  • HiveReader:实现DataX框架核心方法,是具体逻辑。
  • HiveReaderErrorCode:继承了DataX框架的ErrorCode类,是用于统一异常处理DataXException类中调用,具体是新增了一个枚举值。
  • HiveConnByKerberos:是在检测到Hive具备Kerberos认证要求时,进行认证的工具类。
  • plugin.json:DataX插件固定的配置文件,用于指定插件的入口类。
  • plugin_job_template.json:二次开发插件,一般需要提供一下具体的使用方式,此json文件即为HiveReader插件的配置方式说明。

 

   2.1 HiveReader类

    首先是HiveReader类,需要注意的是一些常量或枚举值,需要自行添加,其中DataBaseType枚举类中,需要新增Hive枚举项并添加Hive的驱动类全路径,具体见注释,另外就是Kerberos认证相关的几个配置,一个是keytab的路径,一个是krb5.conf的路径,另外一个是principle的值。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.rdbms.util.DataBaseType;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.security.authentication.util.KerberosName;

import java.lang.reflect.Field;
import java.util.List;

import static com.alibaba.datax.common.base.Constant.DEFAULT_FETCH_SIZE;//2048,可根据条件自己取值
import static com.alibaba.datax.common.base.Key.FETCH_SIZE; // 参数名:"fetchSize"

@Slf4j
public class HiveReader
        extends Reader
{

    //此处需现在com.sinosig.plumber.rdbms.util.DataBaseType枚举类中添加Hive类型,内容为:Hive("hive2", "org.apache.hive.jdbc.HiveDriver"),
    private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive;

    public static class Job
            extends Reader.Job
    {

        private Configuration originalConfig = null;
        private CommonRdbmsReader.Job commonRdbmsReaderJob;

        @Override
        public void init()
        {
            this.originalConfig = getPluginJobConf();


            Boolean haveKerberos = this.originalConfig.getBool(Key.HAVE_KERBEROS, false);
            if (haveKerberos) {
                log.info("检测到kerberos认证,正在进行认证");
                org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
                String kerberosKeytabFilePath =  this.originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
                String kerberosPrincipal =  this.originalConfig.getString(Key.KERBEROS_PRINCIPAL);
                String krb5Path =  this.originalConfig.getString(Key.KRB5_CONF_FILE_PATH);

                hadoopConf.set("hadoop.security.authentication", "kerberos");
                hadoopConf.set("hive.security.authentication", "kerberos");
                hadoopConf.set("hadoop.security.authorization", "true");
                System.setProperty("java.security.krb5.conf",krb5Path);
                refreshConfig();
                HiveConnByKerberos.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf,krb5Path);
            }
            this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
            this.originalConfig = commonRdbmsReaderJob.init(originalConfig);
        }


        @Override
        public void preCheck()
        {
            this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE);
        }

        @Override
        public List<Configuration> split(int adviceNumber)
        {
            return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber);
        }

        @Override
        public void post()
        {
            this.commonRdbmsReaderJob.post(originalConfig);
        }

        @Override
        public void destroy()
        {
            this.commonRdbmsReaderJob.destroy(originalConfig);
        }


    }

    public static class Task
            extends Reader.Task
    {

        private Configuration readerSliceConfig;
        private CommonRdbmsReader.Task commonRdbmsReaderTask;

        @Override
        public void init()
        {
            this.readerSliceConfig = getPluginJobConf();
            this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId());
            this.commonRdbmsReaderTask.init(this.readerSliceConfig);
        }

        @Override
        public void startRead(RecordSender recordSender)
        {
            int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE);

            this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);
        }

        @Override
        public void post()
        {
            this.commonRdbmsReaderTask.post(readerSliceConfig);
        }

        @Override
        public void destroy()
        {
            this.commonRdbmsReaderTask.destroy(readerSliceConfig);
        }
    }
    /** 刷新krb内容信息 */
    public static void refreshConfig() {
        try {
            sun.security.krb5.Config.refresh();
            Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");
            defaultRealmField.setAccessible(true);
            defaultRealmField.set(
                    null,
                    org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());
            // reload java.security.auth.login.config
            javax.security.auth.login.Configuration.setConfiguration(null);
        } catch (Exception e) {
            log.warn(
                    "resetting default realm failed, current default realm will still be used.", e);
        }
    }
}

 2.2 HiveConnByKerberos类

    HiveConnByKerberos类比较简单,是一个通用的Kerberos认证的接口。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.exception.PlumberException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;

@Slf4j
public class HiveConnByKerberos {
    public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf) {
        System.setProperty("java.security.krb5.conf",krb5conf);
        if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
            UserGroupInformation.setConfiguration(hadoopConf);
            try {
                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
            }
            catch (Exception e) {

                log.error("kerberos认证失败");
                String message = String.format("kerberos认证失败,请检查 " +
                                "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",
                        kerberosKeytabFilePath, kerberosPrincipal);
                e.printStackTrace();
                throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
            }
        }
    }
}

 

2.3 HiveReaderErrorCode类

    HiveReaderErrorCode类,主要就是集成ErrorCode类,并添加一个枚举项,这块可直接在ErrorCode类添加,也可使用此类,为固定写法。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.spi.ErrorCode;

public enum HiveReaderErrorCode
        implements ErrorCode
{
    KERBEROS_LOGIN_ERROR("HiveReader-13", "KERBEROS认证失败");

    private final String code;
    private final String description;

    HiveReaderErrorCode(String code, String description)
    {
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode()
    {
        return this.code;
    }

    @Override
    public String getDescription()
    {
        return this.description;
    }

    @Override
    public String toString()
    {
        return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
    }
}

2.4 plugin.json文件

{
  "name": "hivereader",
  "class": "com.alibaba.datax.plugin.reader.hivereader.HiveReader",
  "description": "Retrieve data from Hive via jdbc",
  "developer": "wxm"
}

2.5 plugin_job_template.json文件

    这块需要注意的一个问题是,如果Kerberos认证的Hive连接URL有两种方式,如果是基于zookeeper的方式,则需保证运行DataX服务的节点与zookeeper节点网络是打通的,并且一定不要忘记写上具体的Hive库名。

{
  "name": "hivereader",
  "parameter": {
    "column": [
      "*"
    ],
    "username": "hive",
    "password": "",
"preSql":"show databases;", "connection": [ { "jdbcUrl": [ "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM" ], "table": [ "hive_reader" ] } ], "where": "logdate='20211013'" , "haveKerberos": true, "kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab", "kerberosPrincipal": "hive@EXAMPLE.COM" } }

 

有关DataX二次开发——新增HiveReader插件的更多相关文章

  1. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

    在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

  2. ruby-on-rails - 无法使用 Rails 3.2 创建插件? - 2

    我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby​​1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在

  3. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  4. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  5. ruby - 是否可以覆盖 gemfile 进行本地开发? - 2

    我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI

  6. ruby - 在 Windows 机器上使用 Ruby 进行开发是否会适得其反? - 2

    这似乎非常适得其反,因为太多的gem会在window上破裂。我一直在处理很多mysql和ruby​​-mysqlgem问题(gem本身发生段错误,一个名为UnixSocket的类显然在Windows机器上不能正常工作,等等)。我只是在浪费时间吗?我应该转向不同的脚本语言吗? 最佳答案 我在Windows上使用Ruby的经验很少,但是当我开始使用Ruby时,我是在Windows上,我的总体印象是它不是Windows原生系统。因此,在主要使用Windows多年之后,开始使用Ruby促使我切换回原来的系统Unix,这次是Linux。Rub

  7. ruby-on-rails - 在 Rails 开发环境中为 .ogv 文件设置 Mime 类型 - 2

    我正在玩HTML5视频并且在ERB中有以下片段:mp4视频从在我的开发环境中运行的服务器很好地流式传输到chrome。然而firefox显示带有海报图像的视频播放器,但带有一个大X。问题似乎是mongrel不确定ogv扩展的mime类型,并且只返回text/plain,如curl所示:$curl-Ihttp://0.0.0.0:3000/pr6.ogvHTTP/1.1200OKConnection:closeDate:Mon,19Apr201012:33:50GMTLast-Modified:Sun,18Apr201012:46:07GMTContent-Type:text/plain

  8. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  9. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  10. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

随机推荐