我是靠谱客的博主 淡定太阳,这篇文章主要介绍Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上),现在分享给大家,希望可以做个参考。

Hive on Spark源码分析(一)—— SparkTask
Hive on Spark源码分析(二)—— SparkSession与HiveSparkClient
Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上)
Hive on Spark源码分析(四)—— SparkClilent与SparkClientImpl(下)
Hive on Spark源码分析(五)—— RemoteDriver
Hive on Spark源码分析(六)—— RemoteSparkJobMonitor与JobHandle 


SparkClient接口定义了远程Spark客户端的API

  
  
  1. // 提交一个异步执行的job,返回一个用于监控job的JobHandle
  2. <T extends Serializable> JobHandle<T> submit(Job<T> job);
  3. // 请求远程context执行job。该方法忽视job队列,建议仅用于执行快速结束的任务。
  4. // 返回一个用于监控job的Future结果
  5. <T extends Serializable> Future<T> run(Job<T> job);
  6. /**
  7.   * Stops the remote context.
  8.   *
  9.   * Any pending jobs will be cancelled, and the remote context will be torn down.
  10.   */
  11.  void stop();
  12.  /**
  13.   * Adds a jar file to the running remote context.
  14.   *
  15.   * Note that the URL should be reachable by the Spark driver process. If running the driver
  16.   * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
  17.   * on that node (and not on the client machine).
  18.   *
  19.   * @param uri The location of the jar file.
  20.   * @return A future that can be used to monitor the operation.
  21.   */
  22.  Future<?> addJar(URI uri);
  23.  /**
  24.   * Adds a file to the running remote context.
  25.   *
  26.   * Note that the URL should be reachable by the Spark driver process. If running the driver
  27.   * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
  28.   * on that node (and not on the client machine).
  29.   *
  30.   * @param uri The location of the file.
  31.   * @return A future that can be used to monitor the operation.
  32.   */
  33.  Future<?> addFile(URI uri);
  34.  /**
  35.   * Get the count of executors.
  36.   */
  37.  Future<Integer> getExecutorCount();
  38.  /**
  39.   * Get default parallelism. For standalone mode, this can be used to get total number of cores.
  40.   */
  41.  Future<Integer> getDefaultParallelism();
  42.  /**
  43.   * Check if remote context is still active.
  44.   */
  45.  boolean isActive();
这里代码本身的注释都很清晰,就不做过多解释。

SparkClientImpl是SparkClient接口的具体实现类。我们来看一下它的具体内容。

首先是构造方法,内容较多,首先对属性赋值:

  
  
  1. this.conf = conf;
  2. this.hiveConf = hiveConf;
  3. this.childIdGenerator = new AtomicInteger();
  4. this.jobs = Maps.newConcurrentMap();
  5. String clientId = UUID.randomUUID().toString();
  6. //产生secret用于与remoteDriver建立连接时的身份认证
  7. String secret = rpcServer.createSecret();
  8. //startDriver用来在新的进程中启动RemoteDriver,并返回一个接受执行结果的线程
  9. this.driverThread = startDriver(rpcServer, clientId, secret);
  10. //创建ClientProtocol用于rpc通信,发送消息(提交任务),以及响应远端发来的消息
  11. this.protocol = new ClientProtocol();

接下来需要向rpcServer注册rpc client,并处理可能出现的异常:

  
  
  1. try {
  2. // The RPC server will take care of #rpc client connection# timeouts here.
  3. //向远端RPCServer注册rpc客户端
  4. this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get();
  5. } catch (Throwable e) {
  6. if (e.getCause() instanceof TimeoutException) {
  7. LOG.error("Timed out waiting for client to connect.\nPossible reasons include network " +
  8. "issues, errors in remote driver or the cluster has no available resources, etc." +
  9. "\nPlease check YARN or Spark driver's logs for further information.\nReason2 from SparkClientImpl", e);
  10. } else {
  11. LOG.error("Error while waiting for client to connect.", e);
  12. }
  13. //终端driverThread线程
  14. driverThread.interrupt();
  15. try {
  16. //等待driverThread挂掉
  17. driverThread.join();
  18. } catch (InterruptedException ie) {
  19. // Give up.
  20. LOG.debug("Interrupted before driver thread was finished.");
  21. }
  22. throw Throwables.propagate(e);
  23. }

在继续往下看之前我们先来看一下注册rpc client的具体实现。这个过程实际是在RpcServer中的另一个registerClient方法中实现。首先创建一个promise监控注册任务执行状态
   
   
  1. @VisibleForTesting
  2. Future<Rpc> registerClient(final String clientId, String secret,
  3. RpcDispatcher serverDispatcher, long clientTimeoutMs) {
  4. final Promise<Rpc> promise = group.next().newPromise();

然后创建一个可执行的timeout对象,在clientTimeoutMs时间后(如果promise还没有完成)执行,且只执行一次,来标记promise失败,原因置为TimeoutException
   
   
  1. Runnable timeout = new Runnable() {
  2. @Override
  3. public void run() {
  4. promise.setFailure(new TimeoutException("Timed out waiting for client connection."));
  5. }
  6. };
  7. //在clientTimeoutMs时间后执行timeout,单位是ms,且仅执行一次.
  8. //根据timeout的run方法,这里就是在timeout时间后,如果promise还没有完成,则执行promise.setFailure
  9. ScheduledFuture<?> timeoutFuture = group.schedule(timeout,
  10. clientTimeoutMs,
  11. TimeUnit.MILLISECONDS);
  12. final ClientInfo client = new ClientInfo(clientId, promise, secret, serverDispatcher,
  13. timeoutFuture);
  14. //判断是否已经注册过该client了
  15. if (pendingClients.putIfAbsent(clientId, client) != null) {
  16. throw new IllegalStateException(
  17. String.format("Client '%s' already registered.", clientId));
  18. }
其中clientTimeoutMs是通过在配置文件中的hive.spark.client.server.connect.timeout属性配置的

然后向pendingClients中添加client完成注册,并向promise添加监听器,最后返回promise
   
   
  1. final ClientInfo client = new ClientInfo(clientId, promise, secret, serverDispatcher,
  2. timeoutFuture);
  3. //判断是否已经注册过该client了
  4. if (pendingClients.putIfAbsent(clientId, client) != null) {
  5. throw new IllegalStateException(
  6. String.format("Client '%s' already registered.", clientId));
  7. }
  8. promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
  9. @Override
  10. public void operationComplete(Promise<Rpc> p) {
  11. if (!p.isSuccess()) {
  12. pendingClients.remove(clientId);
  13. }
  14. }
  15. });
  16. return promise;

回到SparkClientImpl中来。最后一步是对driverRpc注册监听器,覆盖rpcClosed方法,在rpc关闭时调用。实例化后标记当前SparkClient为isAlive的

  
  
  1. driverRpc.addListener(new Rpc.Listener() {
  2. @Override
  3. public void rpcClosed(Rpc rpc) {
  4. //如果rpc通信关闭时,当前SparkClient仍然是alive的,则打印warn信息
  5. if (isAlive) {
  6. LOG.warn("Client RPC channel closed unexpectedly.");
  7. isAlive = false;
  8. }
  9. }
  10. });
  11. //实例化后标记为isAlive
  12. isAlive = true;

下面看一下startDriver方法的实现。首先是获得rpcServer的host和port,后面需要用到传给RemoteDriver

  
  
  1. private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret)
  2. throws IOException {
  3. Runnable runnable;
  4. final String serverAddress = rpcServer.getAddress();
  5. final String serverPort = String.valueOf(rpcServer.getPort());

然后判断是否设置了spark.client.do_not_use.run_driver_in_process,是的话直接在当前进程启动RemoteDriver。此模式建议仅作测试用,不要应用到生产环境
   
   
  1. LOG.warn("!!!! Running remote driver in-process. !!!!");
  2. runnable = new Runnable() {
  3. @Override
  4. public void run() {
  5. List<String> args = Lists.newArrayList();
  6. args.add("--remote-host");
  7. args.add(serverAddress);
  8. args.add("--remote-port");
  9. args.add(serverPort);
  10. args.add("--client-id");
  11. args.add(clientId);
  12. args.add("--secret");
  13. args.add(secret);
  14. for (Map.Entry<String, String> e : conf.entrySet()) {
  15. args.add("--conf");
  16. args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey())));
  17. }
  18. try {
  19. RemoteDriver.main(args.toArray(new String[args.size()]));
  20. } catch (Exception e) {
  21. LOG.error("Error running driver.", e);
  22. }
  23. }
  24. };
  25. }

否则会在创建一个新的进程中启动RemoteDriver。首先设置根据配置设置spark home和spark log dir:
   
   
  1. String sparkHome = conf.get(SPARK_HOME_KEY);
  2. if (sparkHome == null) {
  3. sparkHome = System.getenv(SPARK_HOME_ENV);
  4. }
  5. if (sparkHome == null) {
  6. sparkHome = System.getProperty(SPARK_HOME_KEY);
  7. }
  8. String sparkLogDir = conf.get("hive.spark.log.dir");
  9. if (sparkLogDir == null) {
  10. if (sparkHome == null) {
  11. sparkLogDir = "./target/";
  12. } else {
  13. sparkLogDir = sparkHome + "/logs/";
  14. }
  15. }

创建一个文件用来保存spark-submit用到的所有配置:
    
    
  1. File properties = File.createTempFile("spark-submit.", ".properties");
  2. if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
  3. throw new IOException("Cannot change permissions of job properties file.");
  4. }
  5. properties.deleteOnExit();
  6. //用来保存配置
  7. Properties allProps = new Properties();

首先加载spark-default.conf中的配置:
     
     
  1. try {
  2. URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
  3. if (sparkDefaultsUrl != null) {
  4. LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
  5. allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
  6. }
  7. } catch (Exception e) {
  8. String msg = "Exception trying to load spark-defaults.conf: " + e;
  9. throw new IOException(msg, e);
  10. }

然后加载SparkClientImpl中的配置,这里我们略过关于测试用配置的处理
     
     
  1. for (Map.Entry<String, String> e : conf.entrySet()) {
  2. allProps.put(e.getKey(), conf.get(e.getKey()));
  3. }
  4. allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
  5. allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
  6. allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
  7. allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
  8.      ... ... ...

将配置保存到文件:
     
     
  1. Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
  2. try {
  3. allProps.store(writer, "Spark Context configuration");
  4. } finally {
  5. writer.close();
  6. }

由于要新建子进程启动RemoteDriver,因此下面需要决定以何种方式将配置选项传给子进程。如果是以local模式或者yarn-client模式运行,则需要在命令行显示的传递参数;如果是yarn-cluster模式则交给spark-submit来处理。
首先创建一个参数列表argv(最终会转化为在命令行执行的命令),根据不同情况添加不同的参数:
      
      
  1. List<String> argv = Lists.newArrayList();

如果启用kerberos认证,添加kerberos认证的相应参数:
       
       
  1. if (hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase("kerberos")) {
  2. argv.add("kinit");
  3. String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
  4. "0.0.0.0");
  5. String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
  6. argv.add(principal);
  7. argv.add("-k");
  8. argv.add("-t");
  9. argv.add(keyTabFile + ";");
  10. }

接下来添加spark-submit命令。如果配置了spark home则直接添加spark_home/bing/spark-submit命令到argv,否则先添加java_home/bin/java
        
        
  1. if (sparkHome != null) {
  2. argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
  3. } else {
  4. LOG.info("No spark.home provided, calling SparkSubmit directly.");
  5. argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());

获取spark.master
         
         
  1. String master = conf.get("spark.master");
  2. Preconditions.checkArgument(master != null, "spark.master is not defined.");

如果运行模式为local、mesos、yarn-client、standalone模式中的一种,则需要配置spark.driver.memory、spark.driver.extraPath、spark.driver.extraLibPath等
         
         
  1. if (sparkHome != null) {
  2. argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
  3. } else {
  4. LOG.info("No spark.home provided, calling SparkSubmit directly.");
  5. argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
  6. //如果运行模式为local或client模式(其实就是除了yarn-cluster以为的模式)
  7. if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {
  8. String mem = conf.get("spark.driver.memory");
  9. if (mem != null) {
  10. argv.add("-Xms" + mem);
  11. argv.add("-Xmx" + mem);
  12. }
  13. //配置classpath
  14. String cp = conf.get("spark.driver.extraClassPath");
  15. if (cp != null) {
  16. argv.add("-classpath");
  17. argv.add(cp);
  18. }
  19. String libPath = conf.get("spark.driver.extraLibPath");
  20. if (libPath != null) {
  21. argv.add("-Djava.library.path=" + libPath);
  22. }
  23. String extra = conf.get(DRIVER_OPTS_KEY);
  24. if (extra != null) {
  25. for (String opt : extra.split("[ ]")) {
  26. if (!opt.trim().isEmpty()) {
  27. argv.add(opt.trim());
  28. }
  29. }
  30. }
  31. }

然后添加spark-submit命令(所有模式下)
          
          
  1. argv.add("org.apache.spark.deploy.SparkSubmit")

如果是yarn-cluster模式,则将配置文件中spark.executor.core、spark.executor.memory、spark.executor.instances传递给spark-submit
         
         
  1. if (master.equals("yarn-cluster")) {
  2. String executorCores = conf.get("spark.executor.cores");
  3. if (executorCores != null) {
  4. argv.add("--executor-cores");
  5. argv.add(executorCores);
  6. }
  7. String executorMemory = conf.get("spark.executor.memory");
  8. if (executorMemory != null) {
  9. argv.add("--executor-memory");
  10. argv.add(executorMemory);
  11. }
  12. String numOfExecutors = conf.get("spark.executor.instances");
  13. if (numOfExecutors != null) {
  14. argv.add("--num-executors");
  15. argv.add(numOfExecutors);
  16. }
  17. }

如果设置了hive.server2.enable.doas为true的话,也就是允许使用发起请求的用户来执行Hive操作,则需要将用户名添加到配置
          
          
  1. if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
  2. try {
  3. String currentUser = Utils.getUGI().getShortUserName();
  4. // do not do impersonation in CLI mode
  5. if (!currentUser.equals(System.getProperty("user.name"))) {
  6. LOG.info("Attempting impersonation of " + currentUser);
  7. argv.add("--proxy-user");
  8. argv.add(currentUser);
  9. }
  10. } catch (Exception e) {
  11. String msg = "Cannot obtain username: " + e;
  12. throw new IllegalStateException(msg, e);
  13. }
  14. }

接下来添加前面创建用来保存配置的文件的路径,以及配置主类为RemoteDriver,配置当前类的jar包,远程host和port
           
           
  1. argv.add("--properties-file");
  2. argv.add(properties.getAbsolutePath());
  3. argv.add("--class");
  4. argv.add(RemoteDriver.class.getName());
  5. String jar = "spark-internal";
  6. if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
  7. jar = SparkContext.jarOfClass(this.getClass()).get();
  8. }
  9. argv.add(jar);
  10. argv.add("--remote-host");
  11. argv.add(serverAddress);
  12. argv.add("--remote-port");
  13. argv.add(serverPort);

最后,将以hive.spark开头的配置项,以“--conf key=value”的形式添加到参数列表中以传递给RemoteDriver
            
            
  1. for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
  2. String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
  3. argv.add("--conf");
  4. argv.add(String.format("%s=%s", hiveSparkConfKey, value));
  5. }

需要添加的命令和参数都已经添加好,现在将argv转化为命令cmd,创建新的进程来执行:
            
            
  1. String cmd = Joiner.on(" ").join(argv);
  2. LOG.info("Running client driver with argv: {}", cmd);
  3. ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);

启动进程
             
             
  1. // 使Hive配置在spark中不可见,以防互相影响
  2. pb.environment().remove("HIVE_HOME");
  3. pb.environment().remove("HIVE_CONF_DIR");
  4. if (isTesting != null) {
  5. pb.environment().put("SPARK_TESTING", isTesting);
  6. }
  7. final Process child = pb.start();
  8. int childId = childIdGenerator.incrementAndGet();
  9. final List<String> childErrorLog = new ArrayList<String>();
  10. //重定向新进程的输出
  11. redirect("stdout-redir-" + childId, new Redirector(child.getInputStream()));
  12. redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog));

最后启动一个线程,阻塞等待进程的执行结果,并对错误进行记录
             
             
  1. runnable = new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. //阻塞等待进程的执行结果
  6. int exitCode = child.waitFor();
  7. if (exitCode != 0) {
  8. StringBuilder errStr = new StringBuilder();
  9. for (String s : childErrorLog) {
  10. errStr.append(s);
  11. errStr.append('\n');
  12. }
  13. rpcServer.cancelClient(clientId,
  14. "Child process exited before connecting back with error log " + errStr.toString());
  15. LOG.warn("Child process exited with code {}", exitCode);
  16. }
  17. } catch (InterruptedException ie) {
  18. LOG.warn("Waiting thread interrupted, killing child process.");
  19. Thread.interrupted();
  20. child.destroy();
  21. } catch (Exception e) {
  22. LOG.warn("Exception while waiting for child process.", e);
  23. }
  24. }
  25. };
  26. }
  27. Thread thread = new Thread(runnable);
  28. thread.setDaemon(true);
  29. thread.setName("Driver");
  30. thread.start();
  31. return thread;

至此,在SparkClientImpl中启动driver的过程就结束了。

在下篇中我们回到任务提交的流程,去分析 SparkClientImpl中提交任务的方法,以及与提交任务息息相关的内部类ClientProtocol。
















最后

以上就是淡定太阳最近收集整理的关于Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上)的全部内容,更多相关Hive内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部