概述
下面大概了解下面Java的认证相关框架
JAAS 认证和授权框架,只要负责用户的认证和权限。
SASL client 和 server之间认证的框架
GSS 是sasl的一个provider,也就是实现了sasl框架
参考JAAS/GSS-API/SASL/Kerberos简介 | NoSQL漫谈
网上关于high level介绍的还比较多,可以搜索一些,但是要真正理解UserGroupInfomration的功能,还是需要研究 UserGroupInformaiton在 hadoop/hive等大数据系统中如何使用的。
介绍UserGroupInformation 之前,应该了解一下 JaaS框架里 Subject这个概念,简单理解就是代表了一个用户。当通过JAAS登录成功之后,会产生一个subject, 里面包含了一组 Principal, 和其他凭证信息,不如如果是Kerbos认证的话,就会有一个Kerbos身份的principal, 以及把tgt等Credentials的信息放到 privCredentials中。
CASE ONE, 客户端和服务端建立连接
当client 跟Server端建立连接的时候,一般会使用
UserGroupInfomation.doAs(action {
建立连接的代码
})
doAs就会使用UserGroupInformation当前subject执行
这样在建立连接的过程中,就是用了登录后的subject 来建立链接,因为subject包含了kerbos的Principal, 并且拥有合法的Credentials, sasl client和 sasl server在 建立连接的过程中就会使用到。 真实的底层使用的是Gss的实现。
doAs 里面的实现逻辑
@InterfaceAudience.Public
@InterfaceStability.Evolving
public <T> T doAs(PrivilegedAction<T> action) {
if (LOG.isDebugEnabled()) {
LOG.debug("PrivilegedAction [as: {}][action: {}]", this, action,
new Exception());
}
//使用当前subject执行action,也就是建立连接的时候使用登录过的subject
return Subject.doAs(subject, action);
}
当然,如果没有使用UserGroupInformation.doAs的时候,
建立sasl连接的时候使用的就是当前安全上线文里的subject, 用户已经使用Jaas登录过的,登录之后,subject的信息已经存在AccessConect中了,所以也会使用登录后的subject。
CASE TWO 在服务端执行ACTION
连接到服务端之后,一般服务端有两个UGI,一个是服务本身的UGI,比如NN 一般是hdfs, 或者MetaStore, 一般是hive。 hdfs或者 hive用户是服务里的超级用户,客户端一般创建连接之后,执行的操作的用户是 client的用户,这时候就会用到UserGroupInformation里面 proxyUser的概念。
proxyUser顾名思义,就是代理一个用户执行操作。切记,被代理的用户在服务端是没有通过JAAS 认证过的用户,只是给一个用户名而已。 不过这个用户在客户端已经通过JAAS认证过了。
创建代理用户的代码如下:
/**
* Create a proxy user using username of the effective user and the ugi of the
* real user.
* @param user
* @param realUser
* @return proxyUser ugi
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static UserGroupInformation createProxyUser(String user,
UserGroupInformation realUser) {
if (user == null || user.isEmpty()) {
throw new IllegalArgumentException("Null user");
}
if (realUser == null) {
throw new IllegalArgumentException("Null real user");
}
Subject subject = new Subject();
Set<Principal> principals = subject.getPrincipals();
principals.add(new User(user, AuthenticationMethod.PROXY, null));
principals.add(new RealUser(realUser));
return new UserGroupInformation(subject);
}
代理用户的UserGroupInformation里的subject是创建出来的,里面设置了一个 User的principal, 以及一个 RealUser的principal. 这个RealUser的principal是拥有kerbos认证过的,拥有kerbos秘钥。
代理用户的 subject里,没有任何秘钥信息。
如果用户被代理了,一般在服务端执行action的时候都会使用下面格式
UserGroupInformation.doAs
这样在需要权限认证的地方,就获取到的是代理用户的用户名。
切记,不能用代理用户创建 sasl的connection, 因为没有凭证,会失败的。
获取当前用户
UserGroupInformation.getCurrentUser
/**
* Return the current user, including any doAs in the current stack.
* @return the current user
* @throws IOException if login fails
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static UserGroupInformation getCurrentUser() throws IOException {
ensureInitialized();
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
return getLoginUser();
} else {
return new UserGroupInformation(subject);
}
}
从安全上下文获取当前的subject, 如果没有,就获取 getLoginUser, 这时候就触发登录操作。
注意1 里面有一个判断条件是 subject.getPrincipals(User.class).isEnpty()
也就是只有subject里有User这种principals才算是登录过的。
注意2,第一行执行了ensuerInitialized(),这个方法很重要,初始化了hadoop的安全环境.
登录
/**
* Get the currently logged in user. If no explicit login has occurred,
* the user will automatically be logged in with either kerberos credentials
* if available, or as the local OS user, based on security settings.
* @return the logged in user
* @throws IOException if login fails
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static UserGroupInformation getLoginUser() throws IOException {
ensureInitialized();
UserGroupInformation loginUser = loginUserRef.get();
// a potential race condition exists only for the initial creation of
// the login user. there's no need to penalize all subsequent calls
// with sychronization overhead so optimistically create a login user
// and discard if we lose the race.
if (loginUser == null) {
UserGroupInformation newLoginUser = createLoginUser(null);
do {
// it's extremely unlikely that the login user will be non-null
// (lost CAS race), but be nulled before the subsequent get, but loop
// for correctness.
if (loginUserRef.compareAndSet(null, newLoginUser)) {
loginUser = newLoginUser;
// only spawn renewal if this login user is the winner.
loginUser.spawnAutoRenewalThreadForUserCreds(false);
} else {
loginUser = loginUserRef.get();
}
} while (loginUser == null);
}
return loginUser;
}
如果没有loginUser,就调用 createLoginUser(null)
private static
UserGroupInformation createLoginUser(Subject subject) throws IOException {
//登录
UserGroupInformation realUser = doSubjectLogin(subject, null);
UserGroupInformation loginUser = null;
try {
//代理用户的一些设置
// If the HADOOP_PROXY_USER environment variable or property
// is specified, create a proxy user as the logged in user.
String proxyUser = System.getenv(HADOOP_PROXY_USER);
if (proxyUser == null) {
proxyUser = System.getProperty(HADOOP_PROXY_USER);
}
loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
//从配置的文件中获取Token
// Load tokens from files
final Collection<String> tokenFileLocations = new LinkedHashSet<>();
tokenFileLocations.addAll(getTrimmedStringCollection(
System.getProperty(HADOOP_TOKEN_FILES)));
tokenFileLocations.addAll(getTrimmedStringCollection(
conf.get(HADOOP_TOKEN_FILES)));
tokenFileLocations.addAll(getTrimmedStringCollection(
System.getenv(HADOOP_TOKEN_FILE_LOCATION)));
for (String tokenFileLocation : tokenFileLocations) {
if (tokenFileLocation != null && tokenFileLocation.length() > 0) {
File tokenFile = new File(tokenFileLocation);
LOG.debug("Reading credentials from location {}",
tokenFile.getCanonicalPath());
if (tokenFile.exists() && tokenFile.isFile()) {
Credentials cred = Credentials.readTokenStorageFile(
tokenFile, conf);
LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),
tokenFile.getCanonicalPath());
loginUser.addCredentials(cred);
} else {
LOG.info("Token file {} does not exist",
tokenFile.getCanonicalPath());
}
}
}
// Load tokens from base64 encoding
final Collection<String> tokensBase64 = new LinkedHashSet<>();
tokensBase64.addAll(getTrimmedStringCollection(
System.getProperty(HADOOP_TOKENS)));
tokensBase64.addAll(getTrimmedStringCollection(
conf.get(HADOOP_TOKENS)));
tokensBase64.addAll(getTrimmedStringCollection(
System.getenv(HADOOP_TOKEN)));
int numTokenBase64 = 0;
for (String tokenBase64 : tokensBase64) {
if (tokenBase64 != null && tokenBase64.length() > 0) {
try {
Token<TokenIdentifier> token = new Token<>();
token.decodeFromUrlString(tokenBase64);
Credentials cred = new Credentials();
cred.addToken(token.getService(), token);
loginUser.addCredentials(cred);
numTokenBase64++;
} catch (IOException ioe) {
LOG.error("Cannot add token {}: {}",
tokenBase64, ioe.getMessage());
}
}
}
if (numTokenBase64 > 0) {
LOG.debug("Loaded {} base64 tokens", numTokenBase64);
}
} catch (IOException ioe) {
LOG.debug("Failure to load login credentials", ioe);
throw ioe;
}
LOG.debug("UGI loginUser: {}", loginUser);
return loginUser;
}
/**
* Login a subject with the given parameters. If the subject is null,
* the login context used to create the subject will be attached.
* @param subject to login, null for new subject.
* @param params for login, null for externally managed ugi.
* @return UserGroupInformation for subject
* @throws IOException
*/
private static UserGroupInformation doSubjectLogin(
Subject subject, LoginParams params) throws IOException {
ensureInitialized();
// initial default login.
if (subject == null && params == null) {
params = LoginParams.getDefaults();
}
HadoopConfiguration loginConf = new HadoopConfiguration(params);
try {
//这里是关键的一行代码,设置了loginContext的name,
//HadoopConfiguration.KERBEROS_CONFIG_NAME 或者simple
HadoopLoginContext login = newLoginContext(
authenticationMethod.getLoginAppName(), subject, loginConf);
//JAAS的loginContext的login
login.login();
//JAAS login之后,会产出一个subject.这个subject里包含了kerbos的认证信息
UserGroupInformation ugi = new UserGroupInformation(login.getSubject());
// attach login context for relogin unless this was a pre-existing
// subject.
if (subject == null) {
params.put(LoginParam.PRINCIPAL, ugi.getUserName());
ugi.setLogin(login);
ugi.setLastLogin(Time.now());
}
return ugi;
} catch (LoginException le) {
KerberosAuthException kae =
new KerberosAuthException(FAILURE_TO_LOGIN, le);
if (params != null) {
kae.setPrincipal(params.get(LoginParam.PRINCIPAL));
kae.setKeytabFile(params.get(LoginParam.KEYTAB));
kae.setTicketCacheFile(params.get(LoginParam.CCACHE));
}
throw kae;
}
}
看一下HadoopLoginContext的代码
// wrapper to allow access to fields necessary to recreate the same login
// context for relogin. explicitly private to prevent external tampering.
private static class HadoopLoginContext extends LoginContext {
private final String appName;
private final HadoopConfiguration conf;
private AtomicBoolean isLoggedIn = new AtomicBoolean();
HadoopLoginContext(String appName, Subject subject,
HadoopConfiguration conf) throws LoginException {
//这里设置里LoginContext的name以及conf
super(appName, subject, null, conf);
this.appName = appName;
this.conf = conf;
}
LoginContext.init
public LoginContext(String name, Subject subject,
CallbackHandler callbackHandler,
Configuration config) throws LoginException {
this.config = config;
if (config != null) {
creatorAcc = java.security.AccessController.getContext();
}
//loginContext初始化,name是外层传过来的
init(name);
if (subject != null) {
this.subject = subject;
subjectProvided = true;
}
if (callbackHandler == null) {
loadDefaultCallbackHandler();
} else if (creatorAcc == null) {
this.callbackHandler = new SecureCallbackHandler
(java.security.AccessController.getContext(),
callbackHandler);
} else {
this.callbackHandler = callbackHandler;
}
}
private void init(String name) throws LoginException {
SecurityManager sm = System.getSecurityManager();
if (sm != null && creatorAcc == null) {
sm.checkPermission(new AuthPermission
("createLoginContext." + name));
}
if (name == null)
throw new LoginException
(ResourcesMgr.getString("Invalid.null.input.name"));
// get the Configuration
if (config == null) {
config = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<Configuration>() {
public Configuration run() {
return Configuration.getConfiguration();
}
});
}
//划重点,这里根据name获取对应的 AppConfigurationEntry
// get the LoginModules configured for this application
AppConfigurationEntry[] entries = config.getAppConfigurationEntry(name);
if (entries == null) {
if (sm != null && creatorAcc == null) {
sm.checkPermission(new AuthPermission
("createLoginContext." + OTHER));
}
entries = config.getAppConfigurationEntry(OTHER);
if (entries == null) {
MessageFormat form = new MessageFormat(ResourcesMgr.getString
("No.LoginModules.configured.for.name"));
Object[] source = {name};
throw new LoginException(form.format(source));
}
}
moduleStack = new ModuleInfo[entries.length];
for (int i = 0; i < entries.length; i++) {
// clone returned array
moduleStack[i] = new ModuleInfo
(new AppConfigurationEntry
(entries[i].getLoginModuleName(),
entries[i].getControlFlag(),
entries[i].getOptions()),
null);
}
contextClassLoader = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
ClassLoader loader =
Thread.currentThread().getContextClassLoader();
if (loader == null) {
// Don't use bootstrap class loader directly to ensure
// proper package access control!
loader = ClassLoader.getSystemClassLoader();
}
return loader;
}
});
}
上面提到的根据name获取的 AppConfigurationEntry 是在UserGroupInformation里自定义的
/**
* A JAAS configuration that defines the login modules that we want
* to use for login.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
private static class HadoopConfiguration
extends javax.security.auth.login.Configuration {
static final String KRB5_LOGIN_MODULE =
KerberosUtil.getKrb5LoginModuleName();
static final String SIMPLE_CONFIG_NAME = "hadoop-simple";
static final String KERBEROS_CONFIG_NAME = "hadoop-kerberos";
private static final Map<String, String> BASIC_JAAS_OPTIONS =
new HashMap<String,String>();
static {
if ("true".equalsIgnoreCase(System.getenv("HADOOP_JAAS_DEBUG"))) {
BASIC_JAAS_OPTIONS.put("debug", "true");
}
}
static final AppConfigurationEntry OS_SPECIFIC_LOGIN =
new AppConfigurationEntry(
OS_LOGIN_MODULE_NAME,
LoginModuleControlFlag.REQUIRED,
BASIC_JAAS_OPTIONS);
static final AppConfigurationEntry HADOOP_LOGIN =
new AppConfigurationEntry(
HadoopLoginModule.class.getName(),
LoginModuleControlFlag.REQUIRED, //hadoop login是required. 也就是必须要调用这个loginModule.log
BASIC_JAAS_OPTIONS);
private final LoginParams params;
HadoopConfiguration(LoginParams params) {
this.params = params;
}
@Override
public LoginParams getParameters() {
return params;
}
//重写方法根据名字获取AppConfigurationEntry,里面如果是kerbos,就代用getKerberosEntry增加kerbys的AppconfigurationEntry
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
ArrayList<AppConfigurationEntry> entries = new ArrayList<>();
// login of external subject passes no params. technically only
// existing credentials should be used but other components expect
// the login to succeed with local user fallback if no principal.
if (params == null || appName.equals(SIMPLE_CONFIG_NAME)) {
entries.add(OS_SPECIFIC_LOGIN);
} else if (appName.equals(KERBEROS_CONFIG_NAME)) {
// existing semantics are the initial default login allows local user
// fallback. this is not allowed when a principal explicitly
// specified or during a relogin.
if (!params.containsKey(LoginParam.PRINCIPAL)) {
entries.add(OS_SPECIFIC_LOGIN);
}
entries.add(getKerberosEntry());
}
//这里添加了Hadooplogin的entry
entries.add(HADOOP_LOGIN);
return entries.toArray(new AppConfigurationEntry[0]);
}
private AppConfigurationEntry getKerberosEntry() {
final Map<String,String> options = new HashMap<>(BASIC_JAAS_OPTIONS);
LoginModuleControlFlag controlFlag = LoginModuleControlFlag.OPTIONAL;
// kerberos login is mandatory if principal is specified. principal
// will not be set for initial default login, but will always be set
// for relogins.
final String principal = params.get(LoginParam.PRINCIPAL);
if (principal != null) {
options.put("principal", principal);
controlFlag = LoginModuleControlFlag.REQUIRED;
}
// use keytab if given else fallback to ticket cache.
if (IBM_JAVA) {
if (params.containsKey(LoginParam.KEYTAB)) {
final String keytab = params.get(LoginParam.KEYTAB);
if (keytab != null) {
options.put("useKeytab", prependFileAuthority(keytab));
} else {
options.put("useDefaultKeytab", "true");
}
options.put("credsType", "both");
} else {
String ticketCache = params.get(LoginParam.CCACHE);
if (ticketCache != null) {
options.put("useCcache", prependFileAuthority(ticketCache));
} else {
options.put("useDefaultCcache", "true");
}
options.put("renewTGT", "true");
}
} else {
if (params.containsKey(LoginParam.KEYTAB)) {
options.put("useKeyTab", "true");
final String keytab = params.get(LoginParam.KEYTAB);
if (keytab != null) {
options.put("keyTab", keytab);
}
options.put("storeKey", "true");
} else {
options.put("useTicketCache", "true");
String ticketCache = params.get(LoginParam.CCACHE);
if (ticketCache != null) {
options.put("ticketCache", ticketCache);
}
options.put("renewTGT", "true");
}
options.put("doNotPrompt", "true");
}
options.put("refreshKrb5Config", "true");
return new AppConfigurationEntry(
KRB5_LOGIN_MODULE, controlFlag, options);
}
private static String prependFileAuthority(String keytabPath) {
return keytabPath.startsWith("file://")
? keytabPath
: "file://" + keytabPath;
}
}
如果是kerbos的,就会增加 kerbos login entry
entries.add(getKerberosEntry());
默认的controlFlag是 可选的 LoginModuleControlFlag controlFlag = LoginModuleControlFlag.OPTIONAL; // kerberos login is mandatory if principal is specified. principal // will not be set for initial default login, but will always be set // for relogins. final String principal = params.get(LoginParam.PRINCIPAL); if (principal != null) { options.put("principal", principal); controlFlag = LoginModuleControlFlag.REQUIRED; }
不管是哪种登录设置都会增加 HadoopLoginEntry
entries.add(HADOOP_LOGIN);
并且 HADOOP_LOGIN 的LoginModuleControlFlag 是REQUIRED 的。
所以要必须调用HadoopLoginModule.login
/**
* A login module that looks at the Kerberos, Unix, or Windows principal and
* adds the corresponding UserName.
*/
@InterfaceAudience.Private
public static class HadoopLoginModule implements LoginModule {
private Subject subject;
@Override
public boolean abort() throws LoginException {
return true;
}
private <T extends Principal> T getCanonicalUser(Class<T> cls) {
for(T user: subject.getPrincipals(cls)) {
return user;
}
return null;
}
@Override
public boolean commit() throws LoginException {
LOG.debug("hadoop login commit");
// if we already have a user, we are done.
if (!subject.getPrincipals(User.class).isEmpty()) {
LOG.debug("Using existing subject: {}", subject.getPrincipals());
return true;
}
//获取kerberos登录后的principal, 使用kerboers登录后的principal 构造Hadoop 的User
Principal user = getCanonicalUser(KerberosPrincipal.class);
if (user != null) {
LOG.debug("Using kerberos user: {}", user);
}
//If we don't have a kerberos user and security is disabled, check
//if user is specified in the environment or properties
if (!isSecurityEnabled() && (user == null)) {
String envUser = System.getenv(HADOOP_USER_NAME);
if (envUser == null) {
envUser = System.getProperty(HADOOP_USER_NAME);
}
user = envUser == null ? null : new User(envUser);
}
//如果没有获取到kerberos登录后的principal, 使用操作系统登录后的principal 构造Hadoop 的User
// use the OS user
if (user == null) {
user = getCanonicalUser(OS_PRINCIPAL_CLASS);
LOG.debug("Using local user: {}", user);
}
// if we found the user, add our principal
if (user != null) {
LOG.debug("Using user: "{}" with name: {}", user, user.getName());
User userEntry = null;
try {
// LoginContext will be attached later unless it's an external
// subject.
AuthenticationMethod authMethod = (user instanceof KerberosPrincipal)
? AuthenticationMethod.KERBEROS : AuthenticationMethod.SIMPLE;
userEntry = new User(user.getName(), authMethod, null);
} catch (Exception e) {
throw (LoginException)(new LoginException(e.toString()).initCause(e));
}
LOG.debug("User entry: "{}"", userEntry);
subject.getPrincipals().add(userEntry);
return true;
}
throw new LoginException("Failed to find user in name " + subject);
}
@Override
public void initialize(Subject subject, CallbackHandler callbackHandler,
Map<String, ?> sharedState, Map<String, ?> options) {
this.subject = subject;
}
@Override
public boolean login() throws LoginException {
LOG.debug("Hadoop login");
return true;
}
@Override
public boolean logout() throws LoginException {
LOG.debug("Hadoop logout");
return true;
}
}
这里是重点:
重点1: 优先使用kerberos登录后的principal, 构造Hadoop 的User
重点2: 如果没有获取到kerberos登录后的principal, 使用操作系统登录后的principal 构造Hadoop 的User
重点3: 不管是哪种登录认证,最终是要构造 User.class类型的principal,作为hadoop的登录用户。
到此,我们获得了登录用户。
TOKEN相关认证
UserGroupInfomration里还有 关于使用Token作为凭着的subject. 这是通过调用getDelegationToken获取到的token,构建了一个subject, 里面的creds就是token. 当AuthMethod是Token的时候,就获取subject.getCredis.get(Token.cass) 的方式获取token.
这样就能大概对UserGroupInformation有一个大概的了解。
最后
以上就是谨慎铃铛为你收集整理的Hadoop UserGroupInformation详解的全部内容,希望文章能够帮你解决Hadoop UserGroupInformation详解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复