Skip to content

Commit

Permalink
Merge pull request #321 from SergeyYakubov/user-auth-plugins
Browse files Browse the repository at this point in the history
User auth plugins
  • Loading branch information
mvdbeek authored May 11, 2023
2 parents fbf5214 + 8fe2942 commit 31d8a99
Show file tree
Hide file tree
Showing 22 changed files with 331 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ dependencies
dependency_resolvers_conf.xml
job_metrics_conf.xml
.DS_Store
.idea
16 changes: 16 additions & 0 deletions app.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@
## Maximum number of seconds to sleep between each retry.
#amqp_publish_retry_interval_max: 60


## configure user authentication/authorization plugins
## parameters depend on auth type. Authentication plugin should return a username
## and authorization plugin should authorize this user
#user_auth:
# authentication:
# - type: oidc
# oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys
# oidc_provider: azure
# oidc_username_in_token: preferred_username
# oidc_username_template: *.
# authorization:
# - type: userlist
# userlist_allowed_users:
# - xxx

## *Experimental*. Enable file caching by specifing a directory here.
## Directory used to store incoming file cache. It works fine for HTTP
## transfer, have not tested with staging by coping. Also there is no
Expand Down
28 changes: 28 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,34 @@ You can consult the `Kombu documentation
<http://kombu.readthedocs.org/en/latest/reference/kombu.connection.html>`__ for
even more information.

User Authentication/Authorization
`````````````

You can configure Pulsar to authenticate user during request processing and check
if this user is allowed to run a job.

Various authentication/authorization plugins can be configured in `app.yml` to
do that and plugin parameters depend on auth type. For example, the following
configuration uses `oidc` plugin for authentication and `userlist` for
authorization::

user_auth:
authentication:
- type: oidc
oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys
oidc_provider: azure
oidc_username_in_token: preferred_username
oidc_username_template: *.
authorization:
- type: userlist
userlist_allowed_users:
- xxx


see `plugins folder
<https://github.com/galaxyproject/pulsar/blob/master/pulsar/user_auth/methods>`_
for available plugins and their parameters.

Customizing the Pulsar Environment (\*nix only)
-----------------------------------------------

Expand Down
21 changes: 17 additions & 4 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(self, destination_params, job_id):
setattr(self, attr, destination_params.get(attr, None))
self.env = destination_params.get("env", [])
self.files_endpoint = destination_params.get("files_endpoint", None)
self.token_endpoint = destination_params.get("token_endpoint", None)

default_file_action = self.destination_params.get("default_file_action", "transfer")
if default_file_action not in actions:
Expand Down Expand Up @@ -166,7 +167,8 @@ def __init__(self, destination_params, job_id, job_manager_interface):
super().__init__(destination_params, job_id)
self.job_manager_interface = job_manager_interface

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None):
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
Expand All @@ -190,6 +192,8 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s
if job_config and 'touch_outputs' in job_config:
# message clients pass the entire job config
launch_params['submit_extras'] = json_dumps({'touch_outputs': job_config['touch_outputs']})
if token_endpoint is not None:
launch_params["token_endpoint"] = json_dumps({'token_endpoint': token_endpoint})

if job_config and self.setup_handler.local:
# Setup not yet called, job properties were inferred from
Expand Down Expand Up @@ -344,7 +348,8 @@ def __init__(self, destination_params, job_id, client_manager):
self.client_manager = client_manager
self.amqp_key_prefix = self.destination_params.get("amqp_key_prefix")

def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config, dynamic_file_sources):
def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config,
dynamic_file_sources, token_endpoint):
"""
"""
launch_params = dict(command_line=command_line, job_id=self.job_id)
Expand All @@ -359,6 +364,8 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo
launch_params['remote_staging'] = remote_staging
launch_params['remote_staging']['ssh_key'] = self.ssh_key
launch_params['dynamic_file_sources'] = dynamic_file_sources
launch_params['token_endpoint'] = token_endpoint

if job_config and self.setup_handler.local:
# Setup not yet called, job properties were inferred from
# destination arguments. Hence, must have Pulsar setup job
Expand Down Expand Up @@ -397,7 +404,8 @@ def _build_status_request_message(self):

class MessageJobClient(BaseMessageJobClient):

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None):
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
"""
launch_params = self._build_setup_message(
Expand All @@ -407,6 +415,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
self.client_manager.exchange.publish("setup", launch_params)
log.info("Job published to setup message queue: %s", self.job_id)
Expand All @@ -429,7 +438,8 @@ def __init__(self, destination_params, job_id, client_manager, shell):
self.remote_pulsar_path = destination_params["remote_pulsar_path"]
self.shell = shell

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None):
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
"""
launch_params = self._build_setup_message(
Expand All @@ -439,6 +449,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
base64_message = to_base64_json(launch_params)
submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash")
Expand Down Expand Up @@ -479,6 +490,7 @@ def launch(
job_config=None,
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None
) -> Optional[ExternalId]:
"""
Expand All @@ -490,6 +502,7 @@ def launch(
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
container = None
guest_ports = None
Expand Down
2 changes: 1 addition & 1 deletion pulsar/client/staging/up.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def submit_job(client, client_job_description, job_config=None):
# potentially duplicated but we don't want to count on remote staging to include this
# it needs to be in the response to Pulsar even Pulsar is inititing staging actions
launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources

launch_kwds["token_endpoint"] = client.token_endpoint
# for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external
# id from the submission process (e.g. to TES).
launch_response = client.launch(**launch_kwds)
Expand Down
6 changes: 6 additions & 0 deletions pulsar/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from pulsar.tools import ToolBox
from pulsar.tools.authorization import get_authorizer

from pulsar.user_auth.manager import UserAuthManager

log = getLogger(__name__)

DEFAULT_PRIVATE_TOKEN = None
Expand Down Expand Up @@ -46,6 +48,7 @@ def __init__(self, **conf):
self.__setup_object_store(conf)
self.__setup_dependency_manager(conf)
self.__setup_job_metrics(conf)
self.__setup_user_auth_manager(conf)
self.__setup_managers(conf)
self.__setup_file_cache(conf)
self.__setup_bind_to_message_queue(conf)
Expand All @@ -71,6 +74,9 @@ def __setup_bind_to_message_queue(self, conf):
queue_state = messaging.bind_app(self, message_queue_url, conf)
self.__queue_state = queue_state

def __setup_user_auth_manager(self, conf):
self.user_auth_manager = UserAuthManager(conf)

def __setup_tool_config(self, conf):
"""
Setups toolbox object and authorization mechanism based
Expand Down
3 changes: 3 additions & 0 deletions pulsar/manager_endpoint_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def submit_job(manager, job_config):
submit_params = job_config.get('submit_params', {})
touch_outputs = job_config.get('touch_outputs', [])
dynamic_file_sources = job_config.get("dynamic_file_sources", None)
token_endpoint = job_config.get("token_endpoint", None)

job_config = None
if setup_params or force_setup:
input_job_id = setup_params.get("job_id", job_id)
Expand Down Expand Up @@ -108,6 +110,7 @@ def submit_job(manager, job_config):
"env": env,
"setup_params": setup_params,
"dynamic_file_sources": dynamic_file_sources,
"token_endpoint": token_endpoint,
}
manager.preprocess_and_launch(job_id, launch_config)
except Exception:
Expand Down
3 changes: 3 additions & 0 deletions pulsar/managers/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(self, name, app, **kwds):
self.tmp_dir = kwds.get("tmp_dir", None)
self.debug = str(kwds.get("debug", False)).lower() == "true"
self.authorizer = app.authorizer
self.user_auth_manager = app.user_auth_manager
self.__init_system_properties()
self.__init_env_vars(**kwds)
self.dependency_manager = app.dependency_manager
Expand Down Expand Up @@ -179,6 +180,8 @@ def _check_execution(self, job_id, tool_id, command_line):
log.debug("job_id: {} - Checking authorization of command_line [{}]".format(job_id, command_line))
authorization = self._get_authorization(job_id, tool_id)
job_directory = self._job_directory(job_id)
self.user_auth_manager.authorize(job_id, job_directory)

tool_files_dir = job_directory.tool_files_directory()
for file in self._list_dir(tool_files_dir):
if os.path.isdir(join(tool_files_dir, file)):
Expand Down
Empty file added pulsar/user_auth/__init__.py
Empty file.
66 changes: 66 additions & 0 deletions pulsar/user_auth/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from abc import ABC

import inspect


class UserAuthManager(ABC):
"""
Authorization/Authentication manager.
"""

def __init__(self, config):
self._authorization_methods = []
self._authentication_methods = []

try:
user_auth = config.get("user_auth", None)
if not user_auth:
return
authentications = user_auth.pop("authentication", [])
authorizations = user_auth.pop("authorization", [])

for authorization in authorizations:
authorization.update(user_auth)
obj = get_object("pulsar.user_auth.methods." + authorization["type"], "auth_type",
authorization["type"])
self._authorization_methods.append(obj(authorization))

for authentication in authentications:
authentication.update(user_auth)
obj = get_object("pulsar.user_auth.methods." + authentication["type"], "auth_type",
authentication["type"])
self._authentication_methods.append(obj(authentication))
except Exception as e:
raise Exception("cannot read auth configuration") from e

def authorize(self, job_id, job_directory):
authentication_info = self.__authenticate(job_id, job_directory)

if len(self._authorization_methods) == 0:
return True
for method in self._authorization_methods:
res = method.authorize(authentication_info)
if res:
return True

raise Exception("Could not authorize job execution on remote resource")

def __authenticate(self, job_id, job_directory):
if len(self._authentication_methods) == 0:
return {}
for method in self._authentication_methods:
res = method.authenticate(job_directory)
if res:
return res

raise Exception("Could not authenticate job %s" % job_id)


def get_object(module_name, attribute_name, attribute_value):
module = __import__(module_name)
for comp in module_name.split(".")[1:]:
module = getattr(module, comp)
for _, obj in inspect.getmembers(module):
if inspect.isclass(obj) and hasattr(obj, attribute_name) and getattr(obj, attribute_name) == attribute_value:
return obj
raise Exception("Cannot find object %s with attribute %s=%s " % (module_name, attribute_name, attribute_value))
Empty file.
18 changes: 18 additions & 0 deletions pulsar/user_auth/methods/allow_all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pulsar.user_auth.methods.interface import AuthMethod


class AlwaysAllowAuthMethod(AuthMethod):
"""
Always allow
"""

def __init__(self, _config):
pass

auth_type = "allow_all"

def authorize(self, authentication_info):
return True

def authenticate(self, job_directory):
return {"username": "anonymous"}
15 changes: 15 additions & 0 deletions pulsar/user_auth/methods/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod


class AuthMethod(ABC):
"""
Defines the interface to various authentication/authorization methods.
"""

@abstractmethod
def authorize(self, authentication_info):
raise NotImplementedError("a concrete class should implement this")

@abstractmethod
def authenticate(self, job_directory):
raise NotImplementedError("a concrete class should implement this")
Loading

0 comments on commit 31d8a99

Please sign in to comment.