概述
SQL --> AST
SQL转换为AST需要经过org.apache.hadoop.hive.ql.parse.ParseDriver
的parse
方法。
通过org.apache.hadoop.hive.ql.Driver
的compile
方法中的ASTNode tree = pd.parse(command, this.ctx);
进行跳转。
org.apache.hadoop.hive.ql.parse.ParseDriver
----------
public ASTNode parse(String command, Context ctx, boolean setTokenRewriteStream) throws ParseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Parsing command: " + command);
}
// 语法分析
HiveLexerX 是一个词法解析类,
ParseDriver.HiveLexerX lexer = new ParseDriver.HiveLexerX(new ParseDriver.ANTLRNoCaseStringStream(command));
// 根据词法分析的结果得到tokens的,此时不只是单纯的字符串,而是具有特殊意义的字符串的封装,其本身是一个流。
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
if (ctx != null) {
if (setTokenRewriteStream) {
ctx.setTokenRewriteStream(tokens);
}
lexer.setHiveConf(ctx.getConf());
}
// HiveParser是一个语法解析类
HiveParser parser = new HiveParser(tokens);
if (ctx != null) {
parser.setHiveConf(ctx.getConf());
}
parser.setTreeAdaptor(adaptor);
statement_return r = null;
try {
r = parser.statement();
} catch (RecognitionException var9) {
var9.printStackTrace();
throw new ParseException(parser.errors);
}
if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
LOG.debug("Parse Completed");
//
生成AST返回
ASTNode tree = (ASTNode)r.getTree();
tree.setUnknownTokenBoundaries();
return tree;
} else if (lexer.getErrors().size() != 0) {
throw new ParseException(lexer.getErrors());
} else {
throw new ParseException(parser.errors);
}
}
- HiveLexerX
输入HiveSQL,输出TokenRewriteStream(Token数组) - HiveParser
将Token数组转化为抽象语法树AST Tree
这一部分会拿出来单独说一下。
AST --> Task
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer
----------
public void analyze(ASTNode ast, Context ctx) throws SemanticException {
// 初始化上下文
this.initCtx(ctx);
this.init(true);
// 其中analyzeInternal是抽象方法,由不同的子类实现,较为复杂。
// 大致功能为将ASTNode转化为Task,包括可能的optimize
this.analyzeInternal(ast);
}
analyzeInternal的一个子类SemanticAnalyzer(查询分析器)代码量有1W+,Hive优化的秘密全在这里面,后面会单独说。
Task --> QueryPlan
org.apache.hadoop.hive.ql.QueryPlan
----------
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,HiveOperation operation, Schema resultSchema) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
fetchTask = sem.getFetchTask();
// 所有的输入
inputs = sem.getAllInputs();
// 所有的输出
outputs = sem.getAllOutputs();
// 所有的血缘关系
linfo = sem.getLineageInfo();
// 所有表的访问统计信息
tableAccessInfo = sem.getTableAccessInfo();
// 所有列的访问统计信息
columnAccessInfo = sem.getColumnAccessInfo();
idToTableNameMap = new HashMap<String, String>(sem.getIdToTableNameMap());
this.queryId = queryId == null ? makeQueryId() : queryId;
query = new org.apache.hadoop.hive.ql.plan.api.Query();
query.setQueryId(this.queryId);
query.putToQueryAttributes("queryString", this.queryString);
// 查询的属性信息
queryProperties = sem.getQueryProperties();
queryStartTime = startTime;
this.operation = operation;
// 自动提交的值
this.autoCommitValue = sem.getAutoCommitValue();
// 结果集模式的相关信息
this.resultSchema = resultSchema;
}
这一步其实没啥说的,在AST --> Task这一步时,一切都已经盖棺,这里就是确定一下然后敲钉子。
简单来说就是把BaseSemanticAnalyzer
中的内容拷贝出来,组成一个QueryPlan
,然后返回给Driver
的compile
方法中的plan
。
QueryPlan --> Job
最后就是将其转换为Job,提交给yarn运行即可。
首先是execute
,他是总体上负责QueryPlan --> Job的方法。
public int execute(boolean deferClose) throws CommandNeedRetryException {
......
// 将查询计划plan中的任务放入到DriverContext的runnable队列中
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk)
}
......
// 不停的从runnable队列中抽取队首的任务,然后launch该任务
while(driverCxt.isRunning()) {
Task task;
TaskRunner tskRun;
while((task = driverCxt.getRunnable(this.maxthreads)) != null) {
tskRun = this.launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!tskRun.isRunning()) {
break;
}
}
......
// 当这个task执行完毕之后,他的子任务才能被添加到runnable队列
if (tsk.getChildTasks() != null) {
for (Task<? extends Serializable> child : tsk.getChildTasks()) {
if (DriverContext.isLaunchable(child)) {
driverCxt.addToRunnable(child);
}
}
}
......
// 计算资源使用情况与任务执行总耗时
double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00;
ImmutableMap<String, Long> executionHMSTimings = dumpMetaCallTimingWithoutEx("execution");
queryDisplay.setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings);
Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
if (stats != null && !stats.isEmpty()) {
long totalCpu = 0;
console.printInfo("MapReduce Jobs Launched: ");
for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
console.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
totalCpu += entry.getValue().getCpuMSec();
}
console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
}
......
// ok输出
if (console != null) {
console.printInfo("OK");
}
......
但是真正把任务跑起来的是launchTask
。
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName, String jobname, int jobs, DriverContext cxt) throws HiveException {
// HiveHistory 记录 任务历史
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk,tsk.getClass().getName());
}
if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
}
conf.set("mapreduce.workflow.node.name", tsk.getId());
Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
tsk.initialize(conf, plan, cxt);
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
cxt.launching(tskRun);
// 先判断是否支持并发执行,如果支持则调用TaskRunner的start()方法,
// 否则调用tskRun.runSequential()方法顺序执行,只有当是MapReduce任务时,才执行并发执行
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && (tsk.isMapRedTask() || (tsk instanceof MoveTask))) {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
//并发执行,tskRun.start()的内部还是tskRun.runSequential()
tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in serial mode");
}
//顺序执行
tskRun.runSequential();
}
return tskRun;
}
然后看顺序执行,我们从tskRun.runSequential
继续看下去:
org.apache.hadoop.hive.ql.exec.TaskRunner
----------
public void runSequential() {
int exitVal = -101;
try {
// 直接调用Task.executeTask
exitVal = this.tsk.executeTask();
} catch (Throwable var3) {
if (this.tsk.getException() == null) {
this.tsk.setException(var3);
}
LOG.error("Error in executeTask", var3);
}
this.result.setExitVal(exitVal);
if (this.tsk.getException() != null) {
this.result.setTaskError(this.tsk.getException());
}
}
继续深入
org.apache.hadoop.hive.ql.exec.Task
----------
public int executeTask() {
try {
SessionState ss = SessionState.get();
this.setStarted();
if (ss != null) {
ss.getHiveHistory().logPlanProgress(this.queryPlan);
}
// 这里的execute 是一个抽象方法
int retval = this.execute(this.driverContext);
this.setDone();
if (ss != null) {
ss.getHiveHistory().logPlanProgress(this.queryPlan);
}
return retval;
} catch (IOException var3) {
throw new RuntimeException("Unexpected error: " + var3.getMessage(), var3);
}
}
execute
是一个抽象方法,有许多子类,我们着重看MapRedTask
子类。
org.apache.hadoop.hive.ql.exec.mr.MapRedTask
----------
public int execute(DriverContext driverContext) {
......
if (!runningViaChild) {
// 这里直接调用父类方法,也就是ExecDriver.execute
return super.execute(driverContext);
}
......
这个类才是真正干活的类,他把job提交给了yarn。
org.apache.hadoop.hive.ql.exec.mr.ExecDriver
----------
protected transient JobConf job;
......
public int execute(DriverContext driverContext) {
.....
JobClient jc = null;
......
// 设置OutputFormat
job.setOutputFormat(HiveOutputFormatImpl.class);
// 设置mapper组件
job.setMapperClass(ExecMapper.class);
job.setMapOutputKeyClass(HiveKey.class);
job.setMapOutputValueClass(BytesWritable.class);
// 设置分区器组件
try {
String partitioner = HiveConf.getVar(job, ConfVars.HIVEPARTITIONER);
job.setPartitionerClass(JavaUtils.loadClass(partitioner));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
}
job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
job.setReducerClass(ExecReducer.class);
// 启用InputFormat
setInputAttributes(job);
// 是否开启推测执行
boolean useSpeculativeExecReducers = HiveConf.getBoolVar(job,HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, useSpeculativeExecReducers);
// 设置InputFormat
job.setInputFormat(JavaUtils.loadClass(inpFormat));
// 设置OutputFormat的输出K-V
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 生成一个jobName
job.set(MRJobConfig.JOB_NAME, "JOB" + Utilities.randGen.nextInt());
......
// 拿到map作业
MapWork mWork = work.getMapWork();
// 拿到reduce作业
ReduceWork rWork = work.getReduceWork();
......
if (mWork.getNumMapTasks() != null) {
// 设置mapreduce作业
job.setNumMapTasks(mWork.getNumMapTasks().intValue());
}
......
// 设置reduce作业
job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
// 设置reduce类
job.setReducerClass(ExecReducer.class);
......
// 获得JobClient 对象
jc = new JobClient(job);
......
// 提交作业
rj = jc.submitJob(job);
// 获取作业ID
this.jobID = rj.getJobID();
......
最后
以上就是风中蜻蜓为你收集整理的Hive源码阅读--SQL on JOB--ParseDriver/BaseSemanticAnalyzer/QueryPlan/execute的全部内容,希望文章能够帮你解决Hive源码阅读--SQL on JOB--ParseDriver/BaseSemanticAnalyzer/QueryPlan/execute所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复