我是靠谱客的博主 谨慎铃铛,最近开发中收集的这篇文章主要介绍Hadoop UserGroupInformation详解,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

下面大概了解下面Java的认证相关框架

JAAS 认证和授权框架,只要负责用户的认证和权限。

SASL client 和 server之间认证的框架

GSS 是sasl的一个provider,也就是实现了sasl框架

参考JAAS/GSS-API/SASL/Kerberos简介 | NoSQL漫谈

网上关于high level介绍的还比较多,可以搜索一些,但是要真正理解UserGroupInfomration的功能,还是需要研究 UserGroupInformaiton在 hadoop/hive等大数据系统中如何使用的。

介绍UserGroupInformation 之前,应该了解一下 JaaS框架里 Subject这个概念,简单理解就是代表了一个用户。当通过JAAS登录成功之后,会产生一个subject, 里面包含了一组 Principal, 和其他凭证信息,不如如果是Kerbos认证的话,就会有一个Kerbos身份的principal, 以及把tgt等Credentials的信息放到  privCredentials中。

CASE ONE, 客户端和服务端建立连接

当client 跟Server端建立连接的时候,一般会使用

UserGroupInfomation.doAs(action {

   建立连接的代码

}) 

doAs就会使用UserGroupInformation当前subject执行

这样在建立连接的过程中,就是用了登录后的subject 来建立链接,因为subject包含了kerbos的Principal, 并且拥有合法的Credentials,  sasl client和 sasl server在 建立连接的过程中就会使用到。 真实的底层使用的是Gss的实现。

doAs 里面的实现逻辑

@InterfaceAudience.Public
  @InterfaceStability.Evolving
  public <T> T doAs(PrivilegedAction<T> action) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("PrivilegedAction [as: {}][action: {}]", this, action,
          new Exception());
    }
    //使用当前subject执行action,也就是建立连接的时候使用登录过的subject
    return Subject.doAs(subject, action);
  }

当然,如果没有使用UserGroupInformation.doAs的时候,

建立sasl连接的时候使用的就是当前安全上线文里的subject, 用户已经使用Jaas登录过的,登录之后,subject的信息已经存在AccessConect中了,所以也会使用登录后的subject。

CASE TWO 在服务端执行ACTION

连接到服务端之后,一般服务端有两个UGI,一个是服务本身的UGI,比如NN 一般是hdfs, 或者MetaStore, 一般是hive。 hdfs或者 hive用户是服务里的超级用户,客户端一般创建连接之后,执行的操作的用户是 client的用户,这时候就会用到UserGroupInformation里面 proxyUser的概念。

proxyUser顾名思义,就是代理一个用户执行操作。切记,被代理的用户在服务端是没有通过JAAS 认证过的用户,只是给一个用户名而已。 不过这个用户在客户端已经通过JAAS认证过了。

创建代理用户的代码如下:

/**
   * Create a proxy user using username of the effective user and the ugi of the
   * real user.
   * @param user
   * @param realUser
   * @return proxyUser ugi
   */
  @InterfaceAudience.Public
  @InterfaceStability.Evolving
  public static UserGroupInformation createProxyUser(String user,
      UserGroupInformation realUser) {
    if (user == null || user.isEmpty()) {
      throw new IllegalArgumentException("Null user");
    }
    if (realUser == null) {
      throw new IllegalArgumentException("Null real user");
    }
    Subject subject = new Subject();
    Set<Principal> principals = subject.getPrincipals();
    principals.add(new User(user, AuthenticationMethod.PROXY, null));
    principals.add(new RealUser(realUser));
    return new UserGroupInformation(subject);
  }

代理用户的UserGroupInformation里的subject是创建出来的,里面设置了一个 User的principal, 以及一个 RealUser的principal. 这个RealUser的principal是拥有kerbos认证过的,拥有kerbos秘钥。

代理用户的 subject里,没有任何秘钥信息。

如果用户被代理了,一般在服务端执行action的时候都会使用下面格式

UserGroupInformation.doAs

这样在需要权限认证的地方,就获取到的是代理用户的用户名。

切记,不能用代理用户创建 sasl的connection, 因为没有凭证,会失败的。

获取当前用户

UserGroupInformation.getCurrentUser

/**
   * Return the current user, including any doAs in the current stack.
   * @return the current user
   * @throws IOException if login fails
   */
  @InterfaceAudience.Public
  @InterfaceStability.Evolving
  public static UserGroupInformation getCurrentUser() throws IOException {
    ensureInitialized();
    AccessControlContext context = AccessController.getContext();
    Subject subject = Subject.getSubject(context);
    if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
      return getLoginUser();
    } else {
      return new UserGroupInformation(subject);
    }
  }

从安全上下文获取当前的subject, 如果没有,就获取 getLoginUser, 这时候就触发登录操作。

注意1  里面有一个判断条件是 subject.getPrincipals(User.class).isEnpty()

也就是只有subject里有User这种principals才算是登录过的。

注意2,第一行执行了ensuerInitialized(),这个方法很重要,初始化了hadoop的安全环境.

登录

/**
   * Get the currently logged in user.  If no explicit login has occurred,
   * the user will automatically be logged in with either kerberos credentials
   * if available, or as the local OS user, based on security settings.
   * @return the logged in user
   * @throws IOException if login fails
   */
  @InterfaceAudience.Public
  @InterfaceStability.Evolving
  public static UserGroupInformation getLoginUser() throws IOException {
    ensureInitialized();
    UserGroupInformation loginUser = loginUserRef.get();
    // a potential race condition exists only for the initial creation of
    // the login user.  there's no need to penalize all subsequent calls
    // with sychronization overhead so optimistically create a login user
    // and discard if we lose the race.
    if (loginUser == null) {
      UserGroupInformation newLoginUser = createLoginUser(null);
      do {
        // it's extremely unlikely that the login user will be non-null
        // (lost CAS race), but be nulled before the subsequent get, but loop
        // for correctness.
        if (loginUserRef.compareAndSet(null, newLoginUser)) {
          loginUser = newLoginUser;
          // only spawn renewal if this login user is the winner.
          loginUser.spawnAutoRenewalThreadForUserCreds(false);
        } else {
          loginUser = loginUserRef.get();
        }
      } while (loginUser == null);
    }
    return loginUser;
  }

如果没有loginUser,就调用 createLoginUser(null)

private static
  UserGroupInformation createLoginUser(Subject subject) throws IOException {
    //登录
    UserGroupInformation realUser = doSubjectLogin(subject, null);
    UserGroupInformation loginUser = null;
    try {

      //代理用户的一些设置
      // If the HADOOP_PROXY_USER environment variable or property
      // is specified, create a proxy user as the logged in user.
      String proxyUser = System.getenv(HADOOP_PROXY_USER);
      if (proxyUser == null) {
        proxyUser = System.getProperty(HADOOP_PROXY_USER);
      }
      loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);

      //从配置的文件中获取Token
      // Load tokens from files
      final Collection<String> tokenFileLocations = new LinkedHashSet<>();
      tokenFileLocations.addAll(getTrimmedStringCollection(
          System.getProperty(HADOOP_TOKEN_FILES)));
      tokenFileLocations.addAll(getTrimmedStringCollection(
          conf.get(HADOOP_TOKEN_FILES)));
      tokenFileLocations.addAll(getTrimmedStringCollection(
          System.getenv(HADOOP_TOKEN_FILE_LOCATION)));
      for (String tokenFileLocation : tokenFileLocations) {
        if (tokenFileLocation != null && tokenFileLocation.length() > 0) {
          File tokenFile = new File(tokenFileLocation);
          LOG.debug("Reading credentials from location {}",
              tokenFile.getCanonicalPath());
          if (tokenFile.exists() && tokenFile.isFile()) {
            Credentials cred = Credentials.readTokenStorageFile(
                tokenFile, conf);
            LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),
                tokenFile.getCanonicalPath());
            loginUser.addCredentials(cred);
          } else {
            LOG.info("Token file {} does not exist",
                tokenFile.getCanonicalPath());
          }
        }
      }

      // Load tokens from base64 encoding
      final Collection<String> tokensBase64 = new LinkedHashSet<>();
      tokensBase64.addAll(getTrimmedStringCollection(
          System.getProperty(HADOOP_TOKENS)));
      tokensBase64.addAll(getTrimmedStringCollection(
          conf.get(HADOOP_TOKENS)));
      tokensBase64.addAll(getTrimmedStringCollection(
          System.getenv(HADOOP_TOKEN)));
      int numTokenBase64 = 0;
      for (String tokenBase64 : tokensBase64) {
        if (tokenBase64 != null && tokenBase64.length() > 0) {
          try {
            Token<TokenIdentifier> token = new Token<>();
            token.decodeFromUrlString(tokenBase64);
            Credentials cred = new Credentials();
            cred.addToken(token.getService(), token);
            loginUser.addCredentials(cred);
            numTokenBase64++;
          } catch (IOException ioe) {
            LOG.error("Cannot add token {}: {}",
                tokenBase64, ioe.getMessage());
          }
        }
      }
      if (numTokenBase64 > 0) {
        LOG.debug("Loaded {} base64 tokens", numTokenBase64);
      }
    } catch (IOException ioe) {
      LOG.debug("Failure to load login credentials", ioe);
      throw ioe;
    }
    LOG.debug("UGI loginUser: {}", loginUser);
    return loginUser;
  }

/**
   * Login a subject with the given parameters.  If the subject is null,
   * the login context used to create the subject will be attached.
   * @param subject to login, null for new subject.
   * @param params for login, null for externally managed ugi.
   * @return UserGroupInformation for subject
   * @throws IOException
   */
  private static UserGroupInformation doSubjectLogin(
      Subject subject, LoginParams params) throws IOException {
    ensureInitialized();
    // initial default login.
    if (subject == null && params == null) {
      params = LoginParams.getDefaults();
    }
    HadoopConfiguration loginConf = new HadoopConfiguration(params);
    try {
      //这里是关键的一行代码,设置了loginContext的name, 
           //HadoopConfiguration.KERBEROS_CONFIG_NAME 或者simple
      HadoopLoginContext login = newLoginContext(
        authenticationMethod.getLoginAppName(), subject, loginConf);
      //JAAS的loginContext的login
      login.login();
      //JAAS login之后,会产出一个subject.这个subject里包含了kerbos的认证信息
      UserGroupInformation ugi = new UserGroupInformation(login.getSubject());
      // attach login context for relogin unless this was a pre-existing
      // subject.
      if (subject == null) {
        params.put(LoginParam.PRINCIPAL, ugi.getUserName());
        ugi.setLogin(login);
        ugi.setLastLogin(Time.now());
      }
      return ugi;
    } catch (LoginException le) {
      KerberosAuthException kae =
        new KerberosAuthException(FAILURE_TO_LOGIN, le);
      if (params != null) {
        kae.setPrincipal(params.get(LoginParam.PRINCIPAL));
        kae.setKeytabFile(params.get(LoginParam.KEYTAB));
        kae.setTicketCacheFile(params.get(LoginParam.CCACHE));
      }
      throw kae;
    }
  }

看一下HadoopLoginContext的代码

// wrapper to allow access to fields necessary to recreate the same login
  // context for relogin.  explicitly private to prevent external tampering.
  private static class HadoopLoginContext extends LoginContext {
    private final String appName;
    private final HadoopConfiguration conf;
    private AtomicBoolean isLoggedIn = new AtomicBoolean();

    HadoopLoginContext(String appName, Subject subject,
                       HadoopConfiguration conf) throws LoginException {
      //这里设置里LoginContext的name以及conf
      super(appName, subject, null, conf);
      this.appName = appName;
      this.conf = conf;
    }

LoginContext.init

public LoginContext(String name, Subject subject,
                        CallbackHandler callbackHandler,
                        Configuration config) throws LoginException {
        this.config = config;
        if (config != null) {
            creatorAcc = java.security.AccessController.getContext();
        }
        //loginContext初始化,name是外层传过来的
        init(name);
        if (subject != null) {
            this.subject = subject;
            subjectProvided = true;
        }
        if (callbackHandler == null) {
            loadDefaultCallbackHandler();
        } else if (creatorAcc == null) {
            this.callbackHandler = new SecureCallbackHandler
                                (java.security.AccessController.getContext(),
                                callbackHandler);
        } else {
            this.callbackHandler = callbackHandler;
        }
    }

    private void init(String name) throws LoginException {

        SecurityManager sm = System.getSecurityManager();
        if (sm != null && creatorAcc == null) {
            sm.checkPermission(new AuthPermission
                                ("createLoginContext." + name));
        }

        if (name == null)
            throw new LoginException
                (ResourcesMgr.getString("Invalid.null.input.name"));

        // get the Configuration
        if (config == null) {
            config = java.security.AccessController.doPrivileged
                (new java.security.PrivilegedAction<Configuration>() {
                public Configuration run() {
                    return Configuration.getConfiguration();
                }
            });
        }

        //划重点,这里根据name获取对应的 AppConfigurationEntry
        // get the LoginModules configured for this application
        AppConfigurationEntry[] entries = config.getAppConfigurationEntry(name);
        if (entries == null) {

            if (sm != null && creatorAcc == null) {
                sm.checkPermission(new AuthPermission
                                ("createLoginContext." + OTHER));
            }

            entries = config.getAppConfigurationEntry(OTHER);
            if (entries == null) {
                MessageFormat form = new MessageFormat(ResourcesMgr.getString
                        ("No.LoginModules.configured.for.name"));
                Object[] source = {name};
                throw new LoginException(form.format(source));
            }
        }
        moduleStack = new ModuleInfo[entries.length];
        for (int i = 0; i < entries.length; i++) {
            // clone returned array
            moduleStack[i] = new ModuleInfo
                                (new AppConfigurationEntry
                                        (entries[i].getLoginModuleName(),
                                        entries[i].getControlFlag(),
                                        entries[i].getOptions()),
                                null);
        }

        contextClassLoader = java.security.AccessController.doPrivileged
                (new java.security.PrivilegedAction<ClassLoader>() {
                public ClassLoader run() {
                    ClassLoader loader =
                            Thread.currentThread().getContextClassLoader();
                    if (loader == null) {
                        // Don't use bootstrap class loader directly to ensure
                        // proper package access control!
                        loader = ClassLoader.getSystemClassLoader();
                    }

                    return loader;
                }
        });
    }

上面提到的根据name获取的 AppConfigurationEntry 是在UserGroupInformation里自定义的

/**
   * A JAAS configuration that defines the login modules that we want
   * to use for login.
   */
  @InterfaceAudience.Private
  @InterfaceStability.Unstable
  private static class HadoopConfiguration
  extends javax.security.auth.login.Configuration {
    static final String KRB5_LOGIN_MODULE =
        KerberosUtil.getKrb5LoginModuleName();
    static final String SIMPLE_CONFIG_NAME = "hadoop-simple";
    static final String KERBEROS_CONFIG_NAME = "hadoop-kerberos";

    private static final Map<String, String> BASIC_JAAS_OPTIONS =
        new HashMap<String,String>();
    static {
      if ("true".equalsIgnoreCase(System.getenv("HADOOP_JAAS_DEBUG"))) {
        BASIC_JAAS_OPTIONS.put("debug", "true");
      }
    }

    static final AppConfigurationEntry OS_SPECIFIC_LOGIN =
        new AppConfigurationEntry(
            OS_LOGIN_MODULE_NAME,
            LoginModuleControlFlag.REQUIRED,
            BASIC_JAAS_OPTIONS);

    static final AppConfigurationEntry HADOOP_LOGIN =
        new AppConfigurationEntry(
            HadoopLoginModule.class.getName(),
            LoginModuleControlFlag.REQUIRED,   //hadoop login是required. 也就是必须要调用这个loginModule.log
            BASIC_JAAS_OPTIONS);

    private final LoginParams params;

    HadoopConfiguration(LoginParams params) {
      this.params = params;
    }

    @Override
    public LoginParams getParameters() {
      return params;
    }


    //重写方法根据名字获取AppConfigurationEntry,里面如果是kerbos,就代用getKerberosEntry增加kerbys的AppconfigurationEntry
    @Override
    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
      ArrayList<AppConfigurationEntry> entries = new ArrayList<>();
      // login of external subject passes no params.  technically only
      // existing credentials should be used but other components expect
      // the login to succeed with local user fallback if no principal.
      if (params == null || appName.equals(SIMPLE_CONFIG_NAME)) {
        entries.add(OS_SPECIFIC_LOGIN);
      } else if (appName.equals(KERBEROS_CONFIG_NAME)) {
        // existing semantics are the initial default login allows local user
        // fallback. this is not allowed when a principal explicitly
        // specified or during a relogin.
        if (!params.containsKey(LoginParam.PRINCIPAL)) {
          entries.add(OS_SPECIFIC_LOGIN);
        }
        entries.add(getKerberosEntry());
      }
      //这里添加了Hadooplogin的entry
      entries.add(HADOOP_LOGIN);
      return entries.toArray(new AppConfigurationEntry[0]);
    }

    private AppConfigurationEntry getKerberosEntry() {
      final Map<String,String> options = new HashMap<>(BASIC_JAAS_OPTIONS);
      LoginModuleControlFlag controlFlag = LoginModuleControlFlag.OPTIONAL;
      // kerberos login is mandatory if principal is specified.  principal
      // will not be set for initial default login, but will always be set
      // for relogins.
      final String principal = params.get(LoginParam.PRINCIPAL);
      if (principal != null) {
        options.put("principal", principal);
        controlFlag = LoginModuleControlFlag.REQUIRED;
      }

      // use keytab if given else fallback to ticket cache.
      if (IBM_JAVA) {
        if (params.containsKey(LoginParam.KEYTAB)) {
          final String keytab = params.get(LoginParam.KEYTAB);
          if (keytab != null) {
            options.put("useKeytab", prependFileAuthority(keytab));
          } else {
            options.put("useDefaultKeytab", "true");
          }
          options.put("credsType", "both");
        } else {
          String ticketCache = params.get(LoginParam.CCACHE);
          if (ticketCache != null) {
            options.put("useCcache", prependFileAuthority(ticketCache));
          } else {
            options.put("useDefaultCcache", "true");
          }
          options.put("renewTGT", "true");
        }
      } else {
        if (params.containsKey(LoginParam.KEYTAB)) {
          options.put("useKeyTab", "true");
          final String keytab = params.get(LoginParam.KEYTAB);
          if (keytab != null) {
            options.put("keyTab", keytab);
          }
          options.put("storeKey", "true");
        } else {
          options.put("useTicketCache", "true");
          String ticketCache = params.get(LoginParam.CCACHE);
          if (ticketCache != null) {
            options.put("ticketCache", ticketCache);
          }
          options.put("renewTGT", "true");
        }
        options.put("doNotPrompt", "true");
      }
      options.put("refreshKrb5Config", "true");

      return new AppConfigurationEntry(
          KRB5_LOGIN_MODULE, controlFlag, options);
    }

    private static String prependFileAuthority(String keytabPath) {
      return keytabPath.startsWith("file://")
          ? keytabPath
          : "file://" + keytabPath;
    }
  }

如果是kerbos的,就会增加 kerbos login entry

entries.add(getKerberosEntry()); 
默认的controlFlag是 可选的
LoginModuleControlFlag controlFlag = LoginModuleControlFlag.OPTIONAL;
// kerberos login is mandatory if principal is specified.  principal
// will not be set for initial default login, but will always be set
// for relogins.
final String principal = params.get(LoginParam.PRINCIPAL);
if (principal != null) {
  options.put("principal", principal);
  controlFlag = LoginModuleControlFlag.REQUIRED;
}

不管是哪种登录设置都会增加 HadoopLoginEntry

entries.add(HADOOP_LOGIN);

并且 HADOOP_LOGIN 的LoginModuleControlFlag 是REQUIRED 的。

所以要必须调用HadoopLoginModule.login

/**
   * A login module that looks at the Kerberos, Unix, or Windows principal and
   * adds the corresponding UserName.
   */
  @InterfaceAudience.Private
  public static class HadoopLoginModule implements LoginModule {
    private Subject subject;

    @Override
    public boolean abort() throws LoginException {
      return true;
    }

    private <T extends Principal> T getCanonicalUser(Class<T> cls) {
      for(T user: subject.getPrincipals(cls)) {
        return user;
      }
      return null;
    }

    @Override
    public boolean commit() throws LoginException {
      LOG.debug("hadoop login commit");
      // if we already have a user, we are done.
      if (!subject.getPrincipals(User.class).isEmpty()) {
        LOG.debug("Using existing subject: {}", subject.getPrincipals());
        return true;
      }
      //获取kerberos登录后的principal, 使用kerboers登录后的principal 构造Hadoop 的User
      Principal user = getCanonicalUser(KerberosPrincipal.class);
      if (user != null) {
        LOG.debug("Using kerberos user: {}", user);
      }
      //If we don't have a kerberos user and security is disabled, check
      //if user is specified in the environment or properties
      if (!isSecurityEnabled() && (user == null)) {
        String envUser = System.getenv(HADOOP_USER_NAME);
        if (envUser == null) {
          envUser = System.getProperty(HADOOP_USER_NAME);
        }
        user = envUser == null ? null : new User(envUser);
      }
      //如果没有获取到kerberos登录后的principal, 使用操作系统登录后的principal 构造Hadoop 的User
      // use the OS user
      if (user == null) {
        user = getCanonicalUser(OS_PRINCIPAL_CLASS);
        LOG.debug("Using local user: {}", user);
      }
      // if we found the user, add our principal
      if (user != null) {
        LOG.debug("Using user: "{}" with name: {}", user, user.getName());

        User userEntry = null;
        try {
          // LoginContext will be attached later unless it's an external
          // subject.
          AuthenticationMethod authMethod = (user instanceof KerberosPrincipal)
            ? AuthenticationMethod.KERBEROS : AuthenticationMethod.SIMPLE;
          userEntry = new User(user.getName(), authMethod, null);
        } catch (Exception e) {
          throw (LoginException)(new LoginException(e.toString()).initCause(e));
        }
        LOG.debug("User entry: "{}"", userEntry);

        subject.getPrincipals().add(userEntry);
        return true;
      }
      throw new LoginException("Failed to find user in name " + subject);
    }

    @Override
    public void initialize(Subject subject, CallbackHandler callbackHandler,
                           Map<String, ?> sharedState, Map<String, ?> options) {
      this.subject = subject;
    }

    @Override
    public boolean login() throws LoginException {
      LOG.debug("Hadoop login");
      return true;
    }

    @Override
    public boolean logout() throws LoginException {
      LOG.debug("Hadoop logout");
      return true;
    }
  }

这里是重点:

重点1: 优先使用kerberos登录后的principal, 构造Hadoop 的User

重点2: 如果没有获取到kerberos登录后的principal, 使用操作系统登录后的principal 构造Hadoop 的User

重点3: 不管是哪种登录认证,最终是要构造 User.class类型的principal,作为hadoop的登录用户。

到此,我们获得了登录用户。

TOKEN相关认证

UserGroupInfomration里还有 关于使用Token作为凭着的subject. 这是通过调用getDelegationToken获取到的token,构建了一个subject, 里面的creds就是token. 当AuthMethod是Token的时候,就获取subject.getCredis.get(Token.cass) 的方式获取token.

这样就能大概对UserGroupInformation有一个大概的了解。

最后

以上就是谨慎铃铛为你收集整理的Hadoop UserGroupInformation详解的全部内容,希望文章能够帮你解决Hadoop UserGroupInformation详解所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部