Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding HiveServer Credential Provider #210

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<boringssl.version>2.0.17.Final</boringssl.version>
<jetty.version>9.2.10.v20150310</jetty.version>
<hadoopVersion>2.7.2</hadoopVersion>
<hiveVersion>2.1.1</hiveVersion>
<skein.version>UNKNOWN</skein.version>
</properties>

Expand Down Expand Up @@ -60,6 +61,9 @@
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/maven/**</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>google/**</exclude>
<exclude>skein.proto</exclude>
</excludes>
Expand Down Expand Up @@ -390,6 +394,20 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a system has hive available, is this jar likely to already be on the classpath (this is common for other hadoop libraries)? If so, we usually set <scope>provided</scope> for these types of dependencies to keep the size of our jar down and not worry about dependency mismatches. You should be able to drop all the exclusions as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true. Thanks for explaining why I should use provided

<version>${hiveVersion}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-webhcat-java-client</artifactId>
<version>${hiveVersion}</version>
<scope>provided</scope>
</dependency>

<!--Test Dependencies-->
<dependency>
<groupId>junit</groupId>
Expand Down
165 changes: 165 additions & 0 deletions java/src/main/java/com/anaconda/skein/DelegationTokenManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package com.anaconda.skein;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.jdbc.HiveConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.apache.commons.lang.SystemUtils.USER_NAME;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;

public class DelegationTokenManager {
private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenManager.class);
private Credentials credentials;
private final List<Model.DelegationTokenProvider> delegationTokenProviders;

public DelegationTokenManager() { this.delegationTokenProviders=new LinkedList<>(); }

public void addTokenProvider(Model.DelegationTokenProvider p) { this.delegationTokenProviders.add(p); }

public void initializeCredentials(Credentials credentials) { this.credentials = credentials; }

public void obtainTokensHDFS(YarnClient yarnClient, FileSystem fs, Model.ApplicationSpec spec,
Configuration conf) throws IOException, YarnException {
// Collect security tokens as needed
LOG.debug("Collecting filesystem delegation tokens");

List<Path> l= spec.getFileSystems();
l.add(0, new Path(fs.getUri()));

TokenCache.obtainTokensForNamenodes(this.credentials,
l.toArray(new Path[l.size()]), conf);

boolean hasRMToken = false;
for (Token<?> token: this.credentials.getAllTokens()) {
if (token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME)) {
LOG.debug("RM delegation token already acquired");
hasRMToken = true;
break;
}
}
if (!hasRMToken) {
LOG.debug("Adding RM delegation token");
Text rmDelegationTokenService = ClientRMProxy.getRMDelegationTokenService(conf);
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
org.apache.hadoop.yarn.api.records.Token rmDelegationToken =
yarnClient.getRMDelegationToken(new Text(tokenRenewer));
Token<TokenIdentifier> rmToken = ConverterUtils.convertFromYarn(
rmDelegationToken, rmDelegationTokenService
);
this.credentials.addToken(rmDelegationTokenService, rmToken);
}
}

private Text getUniqueAlias(Token<?> token) {
return new Text(String.format("%s_%s_%d", token.getKind().toString(),
token.getService().toString(), System.currentTimeMillis()));
}

// Delegation token based connection is explained here:
// https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-Multi-UserScenariosandProgrammaticLogintoKerberosKDC
// This method is inspired from org.apache.oozie.action.hadoop.Hive2Credentials which does the same thing for Oozie.
public void obtainTokensHive(Map<String, String> config, String user) {
String jdbcUrl = config.get("hive.jdbc.url");
String principal = config.get("hive.jdbc.principal");
String fullUrl = jdbcUrl + ";principal=" + principal + ";hive.server2.proxy.user=" + user;

try {
// load the driver
Class.forName("org.apache.hive.jdbc.HiveDriver");

Connection con = null;
String tokenStr = null;
try {
con = DriverManager.getConnection(fullUrl);
LOG.info("Connected successfully to " + fullUrl);
tokenStr = ((HiveConnection)con).getDelegationToken(user, principal);
} finally {
if (con != null) { con.close(); }
}
LOG.info("Got Hive Server token from " + fullUrl);

Token<DelegationTokenIdentifier> hiveToken = new Token<DelegationTokenIdentifier>();
hiveToken.decodeFromUrlString(tokenStr);
credentials.addToken(getUniqueAlias(hiveToken), hiveToken);
} catch (IOException | SQLException | ClassNotFoundException e) {
e.printStackTrace();
}
}

// This method is inspired from org.apache.oozie.action.hadoop.HCatCredentials which does the same thing for Oozie.
public void obtainTokensHCat(Map<String, String> config, String user){
String principal = config.get("hcat.metastore.principal");
String server = config.get("hcat.metastore.uri");

String protection = "authentication"; // default value
if(config.containsKey(HADOOP_RPC_PROTECTION))
protection = config.get(HADOOP_RPC_PROTECTION);

try {
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.sasl.enabled", "true");
hiveConf.set("hive.metastore.kerberos.principal", principal);
hiveConf.set("hive.metastore.local", "false");
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, server);
hiveConf.set(HADOOP_RPC_PROTECTION, protection);

HCatClient hiveClient = HCatClient.create(hiveConf);
String tokenStrForm = hiveClient.getDelegationToken(user, principal);
Token<DelegationTokenIdentifier> hcatToken = new Token<>();
hcatToken.decodeFromUrlString(tokenStrForm);
credentials.addToken(getUniqueAlias(hcatToken), hcatToken);
} catch (IOException e) {
e.printStackTrace();
}
}

// For some systems (like Hive) to obtain the delegation token we need to do it
// while we are authenticated with kerberos, before we impersonate the user.
public void obtainTokensWithoutImpersonation(String userWeAuthenticateFor) throws IllegalArgumentException {
for (Model.DelegationTokenProvider p : this.delegationTokenProviders) {
if(p.getName().equals("hive")) {
this.obtainTokensHive(p.getConfig(), userWeAuthenticateFor);
} else if(p.getName().equals("hcat")) {
this.obtainTokensHCat(p.getConfig(), userWeAuthenticateFor);
} else {
throw new IllegalArgumentException("The Provider for Delegation Token was not found");
}
}
}

public ByteBuffer toBytes() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
this.credentials.writeTokenStorageToStream(dob);
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
}
87 changes: 23 additions & 64 deletions java/src/main/java/com/anaconda/skein/Driver.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.anaconda.skein;

import com.google.common.base.Strings;
import com.google.common.collect.ObjectArrays;
import com.google.protobuf.ByteString;

import io.grpc.Server;
Expand All @@ -20,13 +19,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
Expand All @@ -43,33 +36,23 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;

public class Driver {
Expand Down Expand Up @@ -359,9 +342,14 @@ private void killApplicationInner(YarnClient yarnClient, FileSystem fs,
/** Start a new application. **/
public ApplicationId submitApplication(final Model.ApplicationSpec spec)
throws IOException, YarnException, InterruptedException {
DelegationTokenManager tokenManager = spec.getDelegationTokenManager();
tokenManager.initializeCredentials(UserGroupInformation.getCurrentUser().getCredentials());

if (spec.getUser().isEmpty()) {
return submitApplicationInner(defaultYarnClient, defaultFileSystem, spec);
} else {
}
else {
tokenManager.obtainTokensWithoutImpersonation(spec.getUser());
return UserGroupInformation.createProxyUser(spec.getUser(), ugi).doAs(
new PrivilegedExceptionAction<ApplicationId>() {
public ApplicationId run() throws IOException, YarnException {
Expand All @@ -371,8 +359,7 @@ public ApplicationId run() throws IOException, YarnException {
}
}

private ApplicationId submitApplicationInner(YarnClient yarnClient,
FileSystem fs, Model.ApplicationSpec spec) throws IOException, YarnException {
private ApplicationId submitApplicationInner(YarnClient yarnClient, FileSystem fs, Model.ApplicationSpec spec) throws IOException, YarnException {
// First validate the spec request
spec.validate();

Expand Down Expand Up @@ -414,12 +401,17 @@ private ApplicationId submitApplicationInner(YarnClient yarnClient,
+ appDir
+ " >" + logdir + "/application.master.log 2>&1"));

UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
ByteBuffer fsTokens = null;
if (UserGroupInformation.isSecurityEnabled()) {
fsTokens = collectTokens(yarnClient, fs, spec);
} else {
env.put("HADOOP_USER_NAME", ugi.getUserName());
DelegationTokenManager tokenManager = spec.getDelegationTokenManager();
tokenManager.obtainTokensHDFS(yarnClient, fs, spec, conf);
fsTokens = tokenManager.toBytes();

// We cancel the delegation token when the job finishes, so that it cannot be used elsewhere
conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
}
else {
env.put("HADOOP_USER_NAME", UserGroupInformation.getCurrentUser().getUserName());
}

Map<ApplicationAccessType, String> acls = spec.getAcls().getYarnAcls();
Expand Down Expand Up @@ -452,43 +444,6 @@ private ApplicationId submitApplicationInner(YarnClient yarnClient,
return appId;
}

private ByteBuffer collectTokens(YarnClient yarnClient, FileSystem fs,
Model.ApplicationSpec spec) throws IOException, YarnException {
// Collect security tokens as needed
LOG.debug("Collecting filesystem delegation tokens");
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.obtainTokensForNamenodes(
credentials,
ObjectArrays.concat(
new Path(fs.getUri()),
spec.getFileSystems().toArray(new Path[0])),
conf);

boolean hasRMToken = false;
for (Token<?> token: credentials.getAllTokens()) {
if (token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME)) {
LOG.debug("RM delegation token already acquired");
hasRMToken = true;
break;
}
}
if (!hasRMToken) {
LOG.debug("Adding RM delegation token");
Text rmDelegationTokenService = ClientRMProxy.getRMDelegationTokenService(conf);
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
org.apache.hadoop.yarn.api.records.Token rmDelegationToken =
yarnClient.getRMDelegationToken(new Text(tokenRenewer));
Token<TokenIdentifier> rmToken = ConverterUtils.convertFromYarn(
rmDelegationToken, rmDelegationTokenService
);
credentials.addToken(rmDelegationTokenService, rmToken);
}

DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}

private LocalResource finalizeSecurityFile(
FileSystem fs, Map<Path, Path> uploadCache, Path appDir,
LocalResource file, ByteString bytes, String filename)
Expand Down Expand Up @@ -1155,10 +1110,14 @@ public void submit(Msg.ApplicationSpec req,
try {
appId = submitApplication(spec);
} catch (Exception exc) {
StringWriter sw = new StringWriter();
exc.printStackTrace(new PrintWriter(sw));

resp.onError(Status.INTERNAL
.withDescription("Failed to submit application, "
+ "exception:\n"
+ exc.getMessage())
+ exc.getMessage()
+ sw.toString())
.asRuntimeException());
return;
}
Expand Down
Loading