大数跨境

【技术解析】Dolphinscheduler实现MapReduce任务的高效管理

【技术解析】Dolphinscheduler实现MapReduce任务的高效管理 海豚调度
2024-11-11
0
导读:本文详细介绍了Dolphinscheduler在MapReduce任务中的应用。

点击蓝字,关注我们

MapReduce是一种编程模型,用于处理和生成大数据集,主要用于大规模数据集(TB级数据规模)的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用,包括GenericOptionsParser与args的区别、hadoop jar命令参数的完整解释、MapReduce实例代码,以及如何在Dolphinscheduler中配置和运行MapReduce任务。



1



GenericOptionsParser vs args区别


GenericOptionsParser 如下:
 
 
 
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
查看 GenericOptionsParser 源码做了什么?
 
 
 
1、构造方法
public GenericOptionsParser(Configuration conf, String[] args)
throws IOException {
this(conf, new Options(), args);
}

2、点击 this
public GenericOptionsParser(Configuration conf,
Options options, String[] args) throws IOException {
this.conf = conf;
parseSuccessful = parseGeneralOptions(options, args);
}

3、查看 parseGeneralOptions
private boolean parseGeneralOptions(Options opts, String[] args)
throws IOException {
opts = buildGeneralOptions(opts);
CommandLineParser parser = new GnuParser();
boolean parsed = false;
try {
commandLine = parser.parse(opts, preProcessForWindows(args), true);
processGeneralOptions(commandLine);
parsed = true;
} catch(ParseException e) {
LOG.warn("options parsing failed: "+e.getMessage());

HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("general options are: ", opts);
}
return parsed;
}

4、看 GnuParser
package org.apache.commons.cli;

import java.util.ArrayList;
import java.util.List;

@Deprecated
public class GnuParser extends Parser {
.......
}

org.apache.commons.cli Parser,是不是有点熟悉?对,请参考 https://segmentfault.com/a/1190000045394541 这篇文章吧

5、看 processGeneralOptions 方法
private void processGeneralOptions(CommandLine line) throws IOException {
if (line.hasOption("fs")) {
FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));
}

if (line.hasOption("jt")) {
String optionValue = line.getOptionValue("jt");
if (optionValue.equalsIgnoreCase("local")) {
conf.set("mapreduce.framework.name", optionValue);
}

conf.set("yarn.resourcemanager.address", optionValue,
"from -jt command line option");
}
if (line.hasOption("conf")) {
String[] values = line.getOptionValues("conf");
for(String value : values) {
conf.addResource(new Path(value));
}
}

if (line.hasOption('D')) {
String[] property = line.getOptionValues('D');
for(String prop : property) {
String[] keyval = prop.split("=", 2);
if (keyval.length == 2) {
conf.set(keyval[0], keyval[1], "from command line");
}
}
}

if (line.hasOption("libjars")) {
// for libjars, we allow expansion of wildcards
conf.set("tmpjars",
validateFiles(line.getOptionValue("libjars"), true),
"from -libjars command line option");
//setting libjars in client classpath
URL[] libjars = getLibJars(conf);
if(libjars!=null && libjars.length>0) {
conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
Thread.currentThread().setContextClassLoader(
new URLClassLoader(libjars,
Thread.currentThread().getContextClassLoader()));
}
}
if (line.hasOption("files")) {
conf.set("tmpfiles",
validateFiles(line.getOptionValue("files")),
"from -files command line option");
}
if (line.hasOption("archives")) {
conf.set("tmparchives",
validateFiles(line.getOptionValue("archives")),
"from -archives command line option");
}
conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);

// tokensFile
if(line.hasOption("tokenCacheFile")) {
String fileName = line.getOptionValue("tokenCacheFile");
// check if the local file exists
FileSystem localFs = FileSystem.getLocal(conf);
Path p = localFs.makeQualified(new Path(fileName));
localFs.getFileStatus(p);
if(LOG.isDebugEnabled()) {
LOG.debug("setting conf tokensFile: " + fileName);
}
UserGroupInformation.getCurrentUser().addCredentials(
Credentials.readTokenStorageFile(p, conf));
conf.set("mapreduce.job.credentials.binary", p.toString(),
"from -tokenCacheFile command line option");

}
}

原理是把 fs、jt、D、libjars、files、archives、tokenCacheFile 相关参数放入到 Hadoop的 Configuration中了,终于清楚 GenericOptionsParser是干什么的了
args呢?如果要使用args,以上这种 fs、jt、D、libjars、files、archives、tokenCacheFile 是需要自己解析的。




2



Hadoop jar完整参数解释


 
 
 
hadoop jar wordcount.jar org.myorg.WordCount \
-fs hdfs://namenode.example.com:8020 \
-jt resourcemanager.example.com:8032 \
-D mapreduce.job.queuename=default \
-libjars /path/to/dependency1.jar,/path/to/dependency2.jar \
-files /path/to/file1.txt,/path/to/file2.txt \
-archives /path/to/archive1.zip,/path/to/archive2.tar.gz \
-tokenCacheFile /path/to/credential.file \
/input /output
这条命令会:
  1. 将作业提交到 hdfs://namenode.example.com:8020 文件系统
  2. 使用 resourcemanager.example.com:8032 作为 YARN ResourceManager
  3. 提交到 default 队列
  4. 使用 /path/to/dependency1.jar 和 /path/to/dependency2.jar 作为依赖
  5. 分发本地文件 /path/to/file1.txt 和 /path/to/file2.txt,注意 : 是本地文件哦
  6. 解压并分发 /path/to/archive1.zip 和 /path/to/archive2.tar.gz
  7. 分发凭证文件 /path/to/credential.file




3



MR实例


WordCount经典示例

 
 
 
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
protected void map(LongWritable key,
Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\\s+");
for (String field : fields) {
word.set(field);
context.write(word, one);
}
}
}

public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

@Override
protected void reduce(Text key,
Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}


public class WCJob {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

// TODO 如果要是本地访问远程的hdfs,需要指定hdfs的根路径,否则只能访问本地的文件系统
// conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");

GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();

for (String arg : args) {
System.out.println("arg :" + arg);
}

for (String remainingArg : remainingArgs) {
System.out.println("remainingArg :" + remainingArg);
}

if (remainingArgs.length < 2) {
throw new RuntimeException("input and output path must set.");
}

Path outputPath = new Path(remainingArgs[1]);
FileSystem fileSystem = FileSystem.get(conf);
boolean exists = fileSystem.exists(outputPath);
// 如果目标目录存在,则删除
if (exists) {
fileSystem.delete(outputPath, true);
}

Job job = Job.getInstance(conf, "MRWordCount");
job.setJarByClass(WCJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

文件分发

 
 
 
public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private List<String> whiteList = new ArrayList<>();
private Text text = new Text();

@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 获取作业提交时传递的文件
URI[] files = context.getCacheFiles();

if (files != null && files.length > 0) {
// 读取文件内容
File configFile = new File("white.txt"); // 文件名要与传递的文件名保持一致
try (BufferedReader reader = new BufferedReader(new FileReader(configFile))){
String line = null;
while ((line = reader.readLine()) != null) {
whiteList.add(line);
}
}
}
}

@Override
protected void map(LongWritable key,
Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

String line = value.toString();
String[] datas = line.split("\\s+");

List<String> whiteDatas = Arrays.stream(datas).filter(data -> whiteList.contains(data)).collect(Collectors.toList());

for (String data : whiteDatas) {
text.set(data);
context.write(text , NullWritable.get());
}
}
}


public class ConfigJob {

public static void main(String[] args) throws Exception {

// 设置用户名
System.setProperty("HADOOP_USER_NAME", "root");

Configuration conf = new Configuration();

conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");

GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();

if (remainingArgs.length < 2) {
throw new RuntimeException("input and output path must set.");
}

Path outputPath = new Path(remainingArgs[1]);
FileSystem fileSystem = FileSystem.get(conf);
boolean exists = fileSystem.exists(outputPath);
// 如果目标目录存在,则删除
if (exists) {
fileSystem.delete(outputPath, true);
}

Job job = Job.getInstance(conf, "MRConfig");
job.setJarByClass(ConfigJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(ConfigMapper.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}




4



Dolphinscheduler MR使用


Yarn test队列设置

YARN 的配置目录中找到 capacity-scheduler.xml 文件。通常位于 $HADOOP_HOME/etc/hadoop/ 目录下。
修改 capacity-scheduler.xml
 
 
 
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default, test</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.test.capacity</name>
<value>30</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.test.maximum-capacity</name>
<value>50</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.test.user-limit-factor</name>
<value>1</value>
</property>
刷新队列配置
yarn rmadmin -refreshQueues

流程定义设置

执行结果

离线任务实例

YARN作业展示

源码分析

org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs
 
 
 
String others = param.getOthers();
// TODO 这里其实就是想说,没有通过 -D mapreduce.job.queuename 形式指定队列,是用页面上直接指定队列名称的,页面上 Yarn队列 输入框
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
String yarnQueue = param.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
}
}

// TODO 这里就是页面上,选项参数 输入框
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
转载自Journey
原文链接:https://segmentfault.com/a/1190000045403915

<🐬🐬 >

推荐阅读

用户实践案例
奇富科技  蜀海供应链 联通数科 拈花云科
蔚来汽车 长城汽车 集度 长安汽车
思科网讯 生鲜电商 联通医疗 联想
新网银行 消费金融  腾讯音乐 自如
有赞 伊利 当贝大数据
联想 传智教育 Bigo
通信行业  作业帮 太美医疗
某新能源 中电信翼康
迁移实践
Azkaban   Ooize   
Airflow (有赞案例) Air2phin(迁移工具)
Airflow迁移实践
Apache DolphinScheduler 3.0.0 升级到 3.1.8 教程
Apache DolphinScheduler 1.3.4升级至3.1.2版本解决方案合集

新手入门
选择Apache DolphinScheduler的10个理由
Apache DolphinScheduler 3.1.8 保姆级教程【安装、介绍、项目运用、邮箱预警设置】轻松拿捏!
Apache DolphinScheduler 如何实现自动化打包+单机/集群部署?
Apache DolphinScheduler-3.1.3 版本安装部署详细教程
Apache DolphinScheduler 在大数据环境中的应用与调优

< 🐬🐬 >
参与社区

参与Apache DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689

非新手问题列表:https://github.com/apache/dolphinscheduler/issues?
q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的!

【声明】内容源于网络
0
0
海豚调度
Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
内容 667
粉丝 0
海豚调度 Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
总阅读167
粉丝0
内容667