大数跨境

在 Apache DolphinScheduler 中测试 Kerberos 认证的 HIVE 连接

在 Apache DolphinScheduler 中测试 Kerberos 认证的 HIVE 连接 海豚调度
2022-01-12
0
导读:希望本文能帮你成功扫雷~

点击上方 蓝字关注我们




                        

作者 | 小钻风

编辑 | 邱忠标


✎ 编 者 按 

Kerberos 是一种计算机网络授权协议,用来在非安全网络中,对个人通信以安全的手段进行身份认证。

本文将手把手指导从零开始在 DolphinScheduler 中测试 Kerberos 认证的HIVE 连接,其中也包括实际环境中所遇到问题的一些解决方法。希望该篇文章能够有幸帮助到您成功扫雷~

本文为邱忠标编辑排版,感谢贡献



01


测试背景


花香入梦。

巡山不辞辛苦,大王一高兴,直接赐下水酒,心情自然是酣畅淋漓。想我是个名副其实的古灵精怪,却也偶尔自诩算是小半个喜好登高望远的文人骚客,此刻借着酒意,也想吟诗一首哇:

啊……
得嘞!

拍案声起,咱们书接上回《一文给你整明白租户在 Apache DolphinScheduler 中的作用》。看完这篇文章后,相信大家已经对租户有了明确认识,让我们再回顾一下

来,read after me:ziwu租,fuwu户……


额……咱们言归正传。

集群开启了 Kerberos 认证,刚接触 DolphinScheduler 而还不知 Kerberos 的小伙伴们,不禁虎躯一震,只觉是脑门上放鞭炮(大难临头),嘶吼一句:配个 SQL 任务,就想连个 HIVE,show 一下 tables,就报任务执行失败,这也太难咯……带着义愤,还请咱们继续往下看。


02


Kerberos


度娘这次是不请自来,直呼:Kerberos 是一种计算机网络授权协议,用来在非安全网络中,对个人通信以安全的手段进行身份认证。


进一步解释是:通过密钥系统为客户机 / 服务器应用程序提供强大的认证服务。该认证过程的实现不依赖于主机操作系统的认证,无需基于主机地址的信任,不要求网络上所有主机的物理安全,并假定网络上传送的数据包可以被任意地读取、修改和插入数据。在以上情况下, Kerberos 作为一种可信任的第三方认证服务,是通过传统的密码技术(如:共享密钥)执行认证服务的。


简而言之,Kerberos 是一种被广泛运用在大数据生态中的身份认证协议,甚至可以说是大数据身份认证的事实标准。一句话来说,Kerberos 是一种基于加密 Ticket 的身份认证协议。Kerberos 主要由 Key Distribution Center (即KDC)、Client 和 Service 三部分组成,客户端会先访问两次 KDC,然后再访问目标 Service,如 HTTP服务。

咱们就先盯盯 DolphinScheduler 中,与 Kerberos 相关的配置即可:
resource.storage.type                                        开启kerberos 需要指定文件存储格式hadoop.security.authentication.startup.state      是否开启kerberos 认证java.security.krb5.conf.path                                kerberos5配置文件所在路径login.user.keytab.username                                kerberos认证的用户principal名称login.user.keytab.path                                         用户的keytab认证文件所在路径kerberos.expire.time                                           kerberos 认证的过期时间


好,天气冷,屁股凉,话不多说,让咱们带着这些个配置参数进入 DolphinScheduler 源码,摸清如何通过 Kerberos 认证连接。


03


准备测试环境



(Windows 本地测试环境,仅供参考,使用 IDEA 打开)

1

修改 common.propertis 文件


位于 dolphinscheduler-common/src/main/resources 目录下
#......# resource store on HDFS/S3/NONEresource.storage.type=HDFS# whether to startup kerberoshadoop.security.authentication.startup.state=true# java.security.krb5.conf pathjava.security.krb5.conf.path=D:/security/krb5/krb5.conf# login user from keytab usernamelogin.user.keytab.username=hive@BIGDATA# login user from keytab pathlogin.user.keytab.path=D:/security/keytabs/hive.service.keytab#......


2

配置mysql


位于 dolphinscheduler-dao/src/main/resources 目录下

spring:  datasource:    driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://192.168.242.40:3306/dolphinscheduler_2.0.1_test?useUnicode=true&characterEncoding=UTF-8&useSSL=false    username: bigdata    password: bigdata@xiaozuanfeng123    hikari:      connection-test-query: select 1      minimum-idle: 5      auto-commit: true      validation-timeout: 3000      pool-name: DolphinScheduler      maximum-pool-size: 50      connection-timeout: 30000      idle-timeout: 600000      leak-detection-threshold: 0      initialization-fail-timeout: 1


3

准备zookeeper 并启动


1、下载 zookeeper

D:\software\java>crul https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz --output apache-zookeeper-3.6.3-bin.tar.gz
2、解压后修改配置
D:\software\java>tar -xvf apache-zookeeper-3.6.3-bin.tar.gzD:\software\java\apache-zookeeper-3.6.3-bin>cd conf D:\software\java\apache-zookeeper-3.6.3-bin\conf>copy zoo_sample.cfg zoo.cfg
3、启动zookeeper
D:\software\java\apache-zookeeper-3.6.3-bin\bin>./zkServer.cmd

4、修改 registry.properties
在 dolphinscheduler-service/src/main/resources 目录下
registry.plugin.name=zookeeperregistry.servers=127.0.0.1:2181

5、启动必要测试组件
1、master-server
启动需指定
-Dspring.profiles.active=mysqldolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

2、worker-server
启动需指定
 -Dspring.profiles.active=mysqldolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

3、api-server
启动需指定
 -Dspring.profiles.active=mysqldolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

4、web 启动(确保 node 环境)
进入dolphinscheduler-ui 目录下
执行 npm install --registry=https://registry.npm.taobao.org
执行 npm run dev

04


调试


1

创建数据源




2

选择 HIVE/IMPALA 数据源




3

点击测试




好,测试成功,完美……


(啥?没成功?那还请您继续看下去)



05


源码分析



1

测试连接流程




2

校验数据源参数



在  dolphinscheduler-datasource-plugin/dolphinscheduler-datasourceapi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource 目录下的 AbstractDataSourceProcessor 类中

1、校验 IP 地址是否合规

// 主要就是要支持多个 IP 或者多个主机名
private static final Pattern IPV4_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\,]+$");private static final Pattern IPV6_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\:\\[\\]\\,]+$");protected void checkHost(String host) {    if (!IPV4_PATTERN.matcher(host).matches() || !IPV6_PATTERN.matcher(host).matches()) {        throw new IllegalArgumentException("datasource host illegal");    }}

2、校验数据库名称是否合规
private static final Pattern DATABASE_PATTER = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.]+$");protected void checkDatasourcePatter(String database) {    if (!DATABASE_PATTER.matcher(database).matches()) {        throw new IllegalArgumentException("datasource name illegal");    }}

3、校验其他参数是否合规
private static final Pattern PARAMS_PATTER = Pattern.compile("^[a-zA-Z0-9\\-\\_\\/\\@\\.]+$");protected void checkOther(Map<String, String> other) {    if (MapUtils.isEmpty(other)) {        return;    }    boolean paramsCheck = other.entrySet().stream().allMatch(p -> PARAMS_PATTER.matcher(p.getValue()).matches());    if (!paramsCheck) {        throw new IllegalArgumentException("datasource other params illegal");    }}

对于小伙伴们遇到的具体连接参数中可能出现的特殊字符,需要结合实际自行修改呢!


3

构建数据源 JDBC 连接参数



1、根据不同的数据库类型,构建不同的 JDBC 连接参数。


2、其中 HiveDatasourceProcessor 实现类就是咱们所关注的。
@Overridepublic BaseConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {    HiveDataSourceParamDTO hiveParam = (HiveDataSourceParamDTO) datasourceParam;    // 用来组装 如 jdbc:hive2://ip:port/db 的 jdbc url     StringBuilder address = new StringBuilder();    address.append(Constants.JDBC_HIVE_2);    for (String zkHost : hiveParam.getHost().split(",")) {        address.append(String.format("%s:%s,", zkHost, hiveParam.getPort()));    }    address.deleteCharAt(address.length() - 1);    String jdbcUrl = address.toString() + "/" + hiveParam.getDatabase();    // 设置 hive 连接参数,见名知意    HiveConnectionParam hiveConnectionParam = new HiveConnectionParam();    hiveConnectionParam.setDatabase(hiveParam.getDatabase());    hiveConnectionParam.setAddress(address.toString());    hiveConnectionParam.setJdbcUrl(jdbcUrl);    hiveConnectionParam.setUser(hiveParam.getUserName());    hiveConnectionParam.setPassword(PasswordUtils.encodePassword(hiveParam.getPassword()));    hiveConnectionParam.setDriverClassName(getDatasourceDriver());    hiveConnectionParam.setValidationQuery(getValidationQuery());    // 当文件存储系统为 HDFS 并且 开启 kerberos 认证时    // 设置 kerberos 相关参数    if (CommonUtils.getKerberosStartupState()) {        hiveConnectionParam.setPrincipal(hiveParam.getPrincipal());        hiveConnectionParam.setJavaSecurityKrb5Conf(hiveParam.getJavaSecurityKrb5Conf());        hiveConnectionParam.setLoginUserKeytabPath(hiveParam.getLoginUserKeytabPath());        hiveConnectionParam.setLoginUserKeytabUsername(hiveParam.getLoginUserKeytabUsername());    }    // 稍微重构了一下 transformOther 方法,提取到父类中    hiveConnectionParam.setOther(transformOther(getDbType(),hiveParam.getOther()));    hiveConnectionParam.setProps(hiveParam.getOther());    // 返回     return hiveConnectionParam;}

4

测试连接


1、测试连接
@Overridepublic Result<Object> checkConnection(DbType type, ConnectionParam connectionParam) {    Result<Object> result = new Result<>();    // 测试连接    // 获取连接    try (Connection connection = DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) {        if (connection == null) {            putMsg(result, Status.CONNECTION_TEST_FAILURE);            return result;        }        putMsg(result, Status.SUCCESS);        return result;    } catch (Exception e) {        logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, connectionParam, e.getMessage());        return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), e.getMessage());    }}

2、获取连接
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin
目录下的 DataSourceClientProvider 类中
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) {    BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;    String datasourceUniqueId = DataSourceUtil.getDatasourceUniqueId(baseConnectionParam, dbType);    logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);    // 有就直接获取    // 没有就创建    DataSourceClient dataSourceClient = uniqueId2dataSourceClientMap.computeIfAbsent(datasourceUniqueId, $ -> {        Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();        DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());        if (null == dataSourceChannel) {            throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));        }        return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);    });    return dataSourceClient.getConnection();}

3、根据不同的数据库类型,创建不同的数据库连接


4、其中 HiveDataSourceClient 实现类就是我们所关注的。

其父类 CommonDataSourceClient 构造分为 4 个过程,也是最为关键的四个过程,具体看 HiveDataSourceClient 中的实现。
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {    this.baseConnectionParam = baseConnectionParam;    // 预初始化    preInit();    // 校验必要参数    checkEnv(baseConnectionParam);    // 初始化连接    initClient(baseConnectionParam, dbType);    // 检查连接连通性    checkClient();}

5、预初始化
@Overrideprotected void preInit() {    logger.info("PreInit in {}", getClass().getName());    // kerberos 连接池    this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor();}

6、校验必要参数
@Overrideprotected void checkEnv(BaseConnectionParam baseConnectionParam) {    // 父类检验必要参数,在此忽略    super.checkEnv(baseConnectionParam);    // 校验 kerberos 必要参数    checkKerberosEnv();}
private void checkKerberosEnv() { // kerberos5配置文件所在路径 String krb5File = PropertyUtils.getString(JAVA_SECURITY_KRB5_CONF_PATH); if (StringUtils.isNotBlank(krb5File)) { // 设置系统环境 System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File); try { Config.refresh(); Class<?> kerberosName = Class.forName("org.apache.hadoop.security.authentication.util.KerberosName"); Field field = kerberosName.getDeclaredField("defaultRealm"); field.setAccessible(true); field.set(null, Config.getInstance().getDefaultRealm()); } catch (Exception e) { throw new RuntimeException("Update Kerberos environment failed.", e); } }}

7、初始化连接

@Overrideprotected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {    logger.info("Create Configuration for hive configuration.");    // 可人为忽略    this.hadoopConf = createHadoopConf();    logger.info("Create Configuration success.");
logger.info("Create UserGroupInformation."); // 创建 ugi this.ugi = createUserGroupInformation(baseConnectionParam.getUser()); logger.info("Create ugi success."); // 父类初始化必要的测试连接 super.initClient(baseConnectionParam, dbType); // 获取 hive session 连接 this.oneSessionDataSource = JdbcDataSourceProvider.createOneSessionJdbcDataSource(dbType,baseConnectionParam); logger.info("Init {} success.", getClass().getName());}创建 UGIprivate UserGroupInformation createUserGroupInformation(String username) { // kerberos5 配置文件所在路径 String krb5File = PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH); // 用户的 keytab 认证文件所在路径 String keytab = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH); // kerberos 认证的用户 principal 名称 String principal = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);
try { // 创建 UGI UserGroupInformation ugi = CommonUtil.createUGI(getHadoopConf(), principal, keytab, krb5File, username); try { Field isKeytabField = ugi.getClass().getDeclaredField("isKeytab"); isKeytabField.setAccessible(true); isKeytabField.set(ugi, true); } catch (NoSuchFieldException | IllegalAccessException e) { logger.warn(e.getMessage()); }
kerberosRenewalService.scheduleWithFixedDelay(() -> { try { ugi.checkTGTAndReloginFromKeytab(); } catch (IOException e) { logger.error("Check TGT and Renewal from Keytab error", e); } }, 5, 5, TimeUnit.MINUTES); return ugi; } catch (IOException e) { throw new RuntimeException("createUserGroupInformation fail. ", e); }}咱们继续跟进。public static synchronized UserGroupInformation createUGI(Configuration configuration, String principal, String keyTab, String krb5File, String username) throws IOException { // 文件系统 是否为 HDFS 以及 是否开启kerberos 认证 if (getKerberosStartupState()) { Objects.requireNonNull(keyTab); if (StringUtils.isNotBlank(krb5File)) { // 设置系统环境 System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File); } // 进行登录 return loginKerberos(configuration, principal, keyTab); } return UserGroupInformation.createRemoteUser(username);}public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal, final String keyTab) throws IOException { // 配置 hadoop 认证模式为 kerberos config.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS); UserGroupInformation.setConfiguration(config); // 使用 principal 和对应的 keytab 登录 UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim()); return UserGroupInformation.getCurrentUser();}创建 UGI 后获取 hive session 连接,在 JdbcDataSourceProvider 类中。(划重点,划重点,所有需要通过 JDBC 获取数据库连接的功能,都由该类向外提供)public static HikariDataSource createOneSessionJdbcDataSource(DbType dbType,BaseConnectionParam properties) { logger.info("Creating OneSession HikariDataSource pool for maxActive:{}", PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE, 50)); HikariDataSource dataSource = new HikariDataSource(); dataSource.setDriverClassName(properties.getDriverClassName()); // 由于我的环境中需要 other 中的一些特征参数,所以需要将 other 参数拼接进 jdbc url 中 // 而这么做,基本上可以解决许多特殊的或者定制化的连接 // 其中该类中 createJdbcDataSource 方法同样可以这么处理 dataSource.setJdbcUrl(DataSourceUtil.getJdbcUrl(dbType,properties)); dataSource.setUsername(properties.getUser()); dataSource.setPassword(PasswordUtils.decodePassword(properties.getPassword())); // 前些天与小伙伴聊到相关,刚好看到相关 pr,便集成进来 Boolean isOneSession = PropertyUtils.getBoolean(Constants.SUPPORT_HIVE_ONE_SESSION, false); dataSource.setMinimumIdle(isOneSession ? 1 : PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE, 5)); dataSource.setMaximumPoolSize(isOneSession ? 1 : PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE, 50)); dataSource.setConnectionTestQuery(properties.getValidationQuery());
if (properties.getProps() != null) { properties.getProps().forEach(dataSource::addDataSourceProperty); }
logger.info("Creating OneSession HikariDataSource pool success."); return dataSource;}

8、检查连接连通性
在其父类 CommonDataSourceClient 中使用创建的连接。//Checking data source clientStopwatch stopwatch = Stopwatch.createStarted();try {    // 执行 select 1 测试是否成功,若本地测试,需要在 dolphinscheduler-datasource-api 下的 pom.xml 文件中添加 hive-jdbc 依赖,直接拷贝现成即可    this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery());} catch (Exception e) {    throw new RuntimeException("JDBC connect failed", e);} finally {    logger.info("Time to execute check jdbc client with sql {} for {} ms ", this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));}

到这儿,咱们终于一览了开启 Kerberos 的 Hive 连接过程,其中也包括实际环境中所遇到问题的一些解决方法。



06


结尾



本次和小伙伴们分享了在 DolphinScheduler 中测试的 HIVE 连接,希望本文能够有幸帮助到您成功扫雷。好,话再不多说……嗝~好酒!


07


参与贡献



随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/docs/development/contribute.html


来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手

微信(Leonard-ds) 手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。添加小助手微信时请说明想参与贡献。


来吧,开源社区非常期待您的参与。







社区官网
https://dolphinscheduler.apache.org/

代码仓地址https://github.com/apache/dolphinscheduler


您的 Star,是 Apache DolphinScheduler 为爱发电的动力❤️ 


投稿请添加社区小助手微信

(Leonard-ds)





WorkflowAsCode 来了,Apache DolphinScheduler 2.0.2 惊喜发布!

☞恭喜 Apache DolphinScheduler 入选可信开源社区共同体预备成员!

☞Apache DolphinScheduler 获评 2021 OSC 最受欢迎项目,白鲸开源获优秀中国开源原生创企奖项!

☞看看又是谁在悄悄做贡献?

☞感谢有你!所有贡献者来领礼物了

☞一文给你整明白多租户在 Apache DolphinScheduler 中的作用

☞开源并不是大牛的专属,普通人也能有属于自己的一亩三分地

在 Apache  DolphinScheduler 上调试 LDAP 登录,亲测有效!

☞4 亿用户,7W+ 作业调度难题,Bigo 基于 Apache DolphinScheduler 巧化解



点击阅读原文,加入开源!




点个在看你最好看

【声明】内容源于网络
0
0
海豚调度
Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
内容 0
粉丝 0
海豚调度 Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
总阅读0
粉丝0
内容0