我是靠谱客的博主 慈祥香水,这篇文章主要介绍Oozie分布式工作流——从理论和实践分析使用节点间的参数传递,现在分享给大家,希望可以做个参考。

Oozie支持Java Action,因此可以自定义很多的功能。本篇就从理论和实践两方面介绍下Java Action的妙用,另外还涉及到oozie中action之间的参数传递。

本文大致分为以下几个部分:

  • Java Action教程文档
  • 自定义Java Action实践
  • 从源码的角度讲解Java Action与Shell Action的参数传递。

如果你即将或者想要使用oozie,那么本篇的文章将会为你提供很多参考的价值。

Java Action文档

java action会自动执行提供的java classpublic static void main方法, 并且会在hadoop集群启动一个单独的map-reduce的map任务来执行的。因此,如果你自定义了一个java程序,它会提交到集群的某一个节点执行,不会每个节点都执行一遍。

workflow任务会等待java程序执行完继续执行下一个action。当java类正确执行退出后,将会进入ok控制流;当发生异常时,将会进入error控制流。Java程序绝对不能使用System.exit(int n)将会导致action进入error控制流。

在action的配置中,也支持EL表达式。并且使用<capture-output>也可以把数据输出出来,然后后面的action就可以基于EL表达式使用了。

语法规则

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1"> ... <action name="[NODE-NAME]"> <java> <job-tracker>[JOB-TRACKER]</job-tracker> <name-node>[NAME-NODE]</name-node> <prepare> <delete path="[PATH]"/> ... <mkdir path="[PATH]"/> ... </prepare> <job-xml>[JOB-XML]</job-xml> <configuration> <property> <name>[PROPERTY-NAME]</name> <value>[PROPERTY-VALUE]</value> </property> ... </configuration> <main-class>[MAIN-CLASS]</main-class> <java-opts>[JAVA-STARTUP-OPTS]</java-opts> <arg>ARGUMENT</arg> ... <file>[FILE-PATH]</file> ... <archive>[FILE-PATH]</archive> ... <capture-output /> </java> <ok to="[NODE-NAME]"/> <error to="[NODE-NAME]"/> </action> ... </workflow-app>

prepare元素,支持创建或者删除指定的文件内容。在delete时,支持通配的方式指定特定的路径。java-opts以及java-opt参数提供了执行java应用时分配的JVM。

举个例子:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1"> ... <action name="myfirstjavajob"> <java> <job-tracker>foo:8021</job-tracker> <name-node>bar:8020</name-node> <prepare> <delete path="${jobOutput}"/> </prepare> <configuration> <property> <name>mapred.queue.name</name> <value>default</value> </property> </configuration> <main-class>org.apache.oozie.MyFirstMainClass</main-class> <java-opts>-Dblah</java-opts> <arg>argument1</arg> <arg>argument2</arg> </java> <ok to="myotherjob"/> <error to="errorcleanup"/> </action> ... </workflow-app>

覆盖Main方法

oozie中的很多action都支持这个功能,在configure中指定classpath下的一个类方法,它会覆盖当前action的main方法。这在不想重新编译jar包,而想替换程序时,非常有用。

自定义Java action程序以及部署

Java程序可以任意定义,比如写一个最简单的hellword,然后打包成lib。

然后需要定义oozie脚本:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<action name="java-7cbb"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>default</value> </property> </configuration> <main-class>a.b.c.Main</main-class> <arg>arg1</arg> <arg>arg2</arg> <file>/oozie/lib/ojdbc7.jar#ojdbc7.jar</file> <capture-output/> </java> <ok to="end"/> <error to="Kill"/> </action>

其中几个比较重要的属性,千万不能拉下:

  • 1 需要指定Map-reduce的队列:mapred.job.queue.name
  • 2 指定Main class<main-class>
  • 3 如果依赖其他的jar,需要添加<file>
  • 4 如果想要捕获输出,需要设置<capture-output>

如果使用HUE图形化配置,就比较简单了:
449064-20170304151154563-1145830649.png

点击右上角的齿轮,配置其他的属性信息:
449064-20170304151159485-393802980.png

基于源码分析参数传递

先从表象来说一下shell action如何传递参数:

你只需要定义一个普通的shell,在里面使用echo把属性输出出来即可,后面的action自动就可以基于EL表达式使用。

复制代码
1
2
test='test123' echo "test=$test"

这样后面的action就可以直接使用了:

复制代码
1
${wf:actionData('action-name').test}或者${wf:actionData('action-name')['test']}

很简单是吧!

在Java里面就没这么容易了:

复制代码
1
无论是 System.out.println() 还是 logger.info/error,都无法捕获到数据

上网找了一篇文章,备受启发

从中抄了一段代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties"; ... String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES); if (oozieProp != null) { File propFile = new File(oozieProp); Properties props = new Properties(); props.setProperty(propKey0, propVal0); props.setProperty(propKey1, propVal1); OutputStream os = new FileOutputStream(propFile); props.store(os, ""); os.close(); } else throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined");

果然就好用了....

为了理解其中的缘由,我们来看看代码。首先在shell action中发现一句话:

复制代码
1
2
3
<<< Invocation of Main class completed <<< Oozie Launcher, capturing output data: =======================

于是全局搜索,果然找到对应的代码,在org.apache.oozie.action.hadoop.LuancherMapper.java中,line275开始:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
if (errorMessage == null) { handleActionData(); if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { System.out.println(); System.out.println("Oozie Launcher, capturing output data:"); System.out.println("======================="); System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS)); System.out.println(); System.out.println("======================="); System.out.println(); } 。。。 }

这里的actionData其实就是个普通的MAP

复制代码
1
2
3
4
private Map<String,String> actionData; public LauncherMapper() { actionData = new HashMap<String,String>(); }

Map里面保存了很多属性值,其中就包括我们想要捕获的输出内容:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static final String ACTION_PREFIX = "oozie.action."; static final String ACTION_DATA_OUTPUT_PROPS = "output.properties"; ... String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS); if (outputProp != null) { File actionOutputData = new File(outputProp); if (actionOutputData.exists()) { int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024); actionData.put(ACTION_DATA_OUTPUT_PROPS, getLocalFileContentStr(actionOutputData, "Output", maxOutputData)); } } .... public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException { StringBuffer sb = new StringBuffer(); FileReader reader = new FileReader(file); char[] buffer = new char[2048]; int read; int count = 0; while ((read = reader.read(buffer)) > -1) { count += read; if (maxLen > -1 && count > maxLen) { throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]"); } sb.append(buffer, 0, read); } reader.close(); return sb.toString(); }

可以看到其实就是从oozie.action.output.properties指定的目录里面去读内容,然后输出出来,后面的action就可以用了。这就是为什么上面抄的那段代码可以使用的原因。

那么问题是,shell为什么直接echo就行,java里面却要这么费劲?

别急,先来看看java action的启动逻辑:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception { run(JavaMain.class, args); } @Override protected void run(String[] args) throws Exception { ... Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class); ... Method mainMethod = klass.getMethod("main", String[].class); try { mainMethod.invoke(null, (Object) args); } catch(InvocationTargetException ex) { // Get rid of the InvocationTargetException and wrap the Throwable throw new JavaMainException(ex.getCause()); } }

它什么也没做,就是启动了目标类的main方法而已。

再来看看shell:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private int execute(Configuration actionConf) throws Exception { ... //判断是否要捕获输出 boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false); //执行命令 Process p = builder.start(); //处理进程 Thread[] thrArray = handleShellOutput(p, captureOutput); ... return exitValue; } protected Thread[] handleShellOutput(Process p, boolean captureOutput) throws IOException { BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream())); // 捕获标准输出 OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput); thrStdout.setDaemon(true); thrStdout.start(); OutputWriteThread thrStderr = new OutputWriteThread(error, false, false); thrStderr.setDaemon(true); thrStderr.start(); return new Thread[]{ thrStdout, thrStderr }; } class OutputWriteThread extends Thread { ... @Override public void run() { String line; BufferedWriter os = null; //读取数据保存在目标文件中 try { if (needCaptured) { File file = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS)); os = new BufferedWriter(new FileWriter(file)); } while ((line = reader.readLine()) != null) { if (isStdout) { // For stdout // 1. Writing to LM STDOUT System.out.println("Stdoutput " + line); // 2. Writing for capture output if (os != null) { if (Shell.WINDOWS) { line = line.replace("\u", "\\u"); } os.write(line); os.newLine(); } } else { System.err.println(line); // 1. Writing to LM STDERR } } } catch (IOException e) { ... }finally { ... } } }

这样就很清晰了,shell自动帮我们把输出的内容写入了oozie.action.output.properties文件中。而在java中则需要用户自己来定义写入的过程。

后续将会介绍一下oozie中比较高级的用法——EL表达式

最后

以上就是慈祥香水最近收集整理的关于Oozie分布式工作流——从理论和实践分析使用节点间的参数传递的全部内容,更多相关Oozie分布式工作流——从理论和实践分析使用节点间内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(78)

评论列表共有 0 条评论

立即
投稿
返回
顶部