diff --git a/CHANGELOG.md b/CHANGELOG.md index 024d40a8..5e6d7807 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,24 @@ # Change Log All notable changes to this project will be documented in this file. +## 1.2.0 - 2019-07-15 +### Added +- Added actor events subsystem with events agent that reads from the events queue. +- Added support for actor links to send an actor's events to another actor. +- Added support for an actor webhook property for sending an actor's events as an HTTP POST to an endpoint. +- Added timing data to messages POST processing. + +### Changed +- Executions now change to status "RUNNING" as soon as a worker starts the corresponing actor container. +- Force halting an execution fails if the status is not RUNNING. +- Reading and managing nonces associated with aliases requires permissions on both the alias and the actor. +- Spawner now sets actor to READY state before setting worker to READY state to prevent autoscaler from stopping worker before actor is update to READY. +- Updated ActorMsgQueue to use a new, simpler class, TaskQueue, removing dependency on channelpy. + +### Removed +- No change. + + ## 1.1.0 - 2019-06-18 ### Added - Added support for sending synchronous messages to an actor. diff --git a/Dockerfile b/Dockerfile index 60ed6725..4aa97e65 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,9 @@ RUN pip3 install -r /requirements.txt RUN touch /var/log/abaco.log +# set default threads for gunicorn +ENV threads=3 + # todo -- add/remove to toggle between local channelpy and github instance #ADD channelpy /channelpy #RUN pip3 install /channelpy diff --git a/abaco.conf b/abaco.conf index ba8da19d..fcdc33c2 100644 --- a/abaco.conf +++ b/abaco.conf @@ -64,6 +64,9 @@ dd: unix://var/run/docker.sock # number of worker containers to initially start when an actor is created init_count: 1 +# set whether autoscaling is enabled +autoscaling = false + # max length of time, in seconds, an actor container is allowed to execute before being killed. # set to -1 for indefinite execution time. max_run_time: -1 @@ -116,6 +119,11 @@ show_traceback: false # Here we set the to 12 hours. log_ex: 43200 +# Max length (in bytes) to store an actor execution's log. If a log exceeds this length, the log will be truncated. +# Note: max_log_length must not exceed the maximum document length for the log store. +# here we default it to 1 MB +max_log_length: 1000000 + # Either camel or snake: Whether to return responses in camel case or snake. Default is snake. case: snake diff --git a/actors/auth.py b/actors/auth.py index 6050d7c6..e628e393 100644 --- a/actors/auth.py +++ b/actors/auth.py @@ -206,12 +206,32 @@ def authorization(): if '/actors/aliases' in request.url_rule.rule: alias_id = get_alias_id() noun = 'alias' - if request.method == 'GET': - # GET requests require READ access - has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ) - # all other requests require UPDATE access - elif request.method in ['DELETE', 'POST', 'PUT']: - has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE) + # we need to compute the db_id since it is not computed in the general case for + # alias endpoints + db_id, _ = get_db_id() + # reading/creating/updating nonces for an alias requires permissions for both the + # alias itself and the underlying actor + if 'nonce' in request.url_rule.rule: + noun = 'alias and actor' + # logger.debug("checking user {} has permissions for " + # "alias: {} and actor: {}".format(g.user, alias_id, db_id)) + if request.method == 'GET': + # GET requests require READ access + + has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ) + has_pem = has_pem and check_permissions(user=g.user, identifier=db_id, level=codes.READ) + elif request.method in ['DELETE', 'POST', 'PUT']: + has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE) + has_pem = has_pem and check_permissions(user=g.user, identifier=db_id, level=codes.UPDATE) + + # otherwise, this is a request to manage the alias itself; only requires permissions on the alias + else: + if request.method == 'GET': + # GET requests require READ access + has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ) + # all other requests require UPDATE access + elif request.method in ['DELETE', 'POST', 'PUT']: + has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE) else: # all other checks are based on actor-id: noun = 'actor' @@ -318,14 +338,21 @@ def check_permissions(user, identifier, level): def get_db_id(): """Get the db_id and actor_identifier from the request path.""" - # logger.debug("top of get_db_id. request.path: {}".format(request.path)) + # the location of the actor identifier is different for aliases vs actor_id's. + # for actors, it is in index 2: + # /actors/ + # for aliases, it is in index 3: + # /actors/aliases/ + idx = 2 + if 'aliases' in request.path: + idx = 3 path_split = request.path.split("/") if len(path_split) < 3: logger.error("Unrecognized request -- could not find the actor id. path_split: {}".format(path_split)) raise PermissionsException("Not authorized.") - # logger.debug("path_split: {}".format(path_split)) - actor_identifier = path_split[2] - # logger.debug("actor_identifier: {}; tenant: {}".format(actor_identifier, g.tenant)) + logger.debug("path_split: {}".format(path_split)) + actor_identifier = path_split[idx] + logger.debug("actor_identifier: {}; tenant: {}".format(actor_identifier, g.tenant)) try: actor_id = Actor.get_actor_id(g.tenant, actor_identifier) except KeyError: diff --git a/actors/channels.py b/actors/channels.py index 48d5bb24..248cbe19 100644 --- a/actors/channels.py +++ b/actors/channels.py @@ -96,6 +96,26 @@ def put_cmd(self, actor_id, worker_id, image, tenant, stop_existing=True): self.put(msg) +class EventsChannel(Channel): + """Work with events on the events channel.""" + + event_queue_names = ('default', + ) + + def __init__(self, name='default'): + self.uri = Config.get('rabbit', 'uri') + if name not in EventsChannel.event_queue_names: + raise Exception('Invalid Events Channel Queue name.') + + super().__init__(name='events_channel_{}'.format(name), + connection_type=RabbitConnection, + uri=self.uri) + + def put_event(self, json_data): + """Put a new event on the events channel.""" + self.put(json_data) + + class BinaryChannel(BasicChannel): """Override BaseChannel methods to handle binary messages.""" @@ -131,8 +151,21 @@ def get_one(self): return self._process(msg.body), msg +from queues import BinaryTaskQueue + + +class ActorMsgChannel(BinaryTaskQueue): + def __init__(self, actor_id): + super().__init__(name='actor_msg_{}'.format(actor_id)) + + def put_msg(self, message, d={}, **kwargs): + d['message'] = message + for k, v in kwargs: + d[k] = v + self.put(d) + -class ActorMsgChannel(BinaryChannel): +class ActorMSSgChannel(BinaryChannel): """Work with messages sent to a specific actor. """ def __init__(self, actor_id): diff --git a/actors/clients.py b/actors/clients.py index 8be230e3..bb236c88 100644 --- a/actors/clients.py +++ b/actors/clients.py @@ -75,7 +75,7 @@ def run(self): message, msg_obj = self.ch.get_one() # we directly ack messages from the clients channel because caller expects direct reply_to msg_obj.ack() - logger.info("cleintg processing message: {}".format(message)) + logger.info("clientg processing message: {}".format(message)) anon_ch = message['reply_to'] cmd = message['value'] if cmd.get('command') == 'new': diff --git a/actors/codes.py b/actors/codes.py index 47e1b904..865b10b5 100644 --- a/actors/codes.py +++ b/actors/codes.py @@ -13,6 +13,7 @@ PROCESSING = 'PROCESSING' COMPLETE = 'COMPLETE' SUBMITTED = 'SUBMITTED' +RUNNING = 'RUNNING' READY = 'READY' ERROR = 'ERROR' BUSY = 'BUSY' diff --git a/actors/controllers.py b/actors/controllers.py index 8a23ec41..9e08be7a 100644 --- a/actors/controllers.py +++ b/actors/controllers.py @@ -5,6 +5,7 @@ import requests import threading import time +import timeit from channelpy.exceptions import ChannelClosedException, ChannelTimeoutException from flask import g, request, render_template, make_response, Response @@ -14,7 +15,7 @@ from auth import check_permissions, get_tas_data, tenant_can_use_tas, get_uid_gid_homedir from channels import ActorMsgChannel, CommandChannel, ExecutionResultsChannel, WorkerChannel -from codes import SUBMITTED, COMPLETE, PERMISSION_LEVELS, READ, UPDATE, PERMISSION_LEVELS, PermissionLevel +from codes import SUBMITTED, COMPLETE, PERMISSION_LEVELS, READ, UPDATE, EXECUTE, PERMISSION_LEVELS, PermissionLevel from config import Config from errors import DAOError, ResourceError, PermissionsException, WorkerException from models import dict_to_camel, display_time, is_hashid, Actor, Alias, Execution, ExecutionsSummary, Nonce, Worker, get_permissions, \ @@ -56,6 +57,12 @@ class MetricsResource(Resource): def get(self): + enable_autoscaling = Config.get('workers', 'autoscaling') + if hasattr(enable_autoscaling, 'lower'): + if not enable_autoscaling.lower() == 'true': + return + else: + return actor_ids = self.get_metrics() self.check_metrics(actor_ids) # self.add_workers(actor_ids) @@ -83,7 +90,7 @@ def get_metrics(self): def check_metrics(self, actor_ids): for actor_id in actor_ids: - logger.debug("TOP OF CHECK METRICS") + logger.debug("TOP OF CHECK METRICS for actor_id {}".format(actor_id)) data = metrics_utils.query_message_count_for_actor(actor_id) try: @@ -92,45 +99,36 @@ def check_metrics(self, actor_ids): logger.info("No current message count for actor {}".format(actor_id)) current_message_count = 0 - change_rate = metrics_utils.calc_change_rate( - data, - last_metric, - actor_id - ) - last_metric.update({actor_id: data}) - workers = Worker.get_workers(actor_id) actor = actors_store[actor_id] logger.debug('METRICS: MAX WORKERS TEST {}'.format(actor)) # If this actor has a custom max_workers, use that. Otherwise use default. - if actor['max_workers']: - max_workers = actor['max_workers'] - else: - max_workers = Config.get('spawner', 'max_workers_per_actor') - - - # Add a worker if actor has 0 workers & a message in the Q - # spawner_worker_ch = SpawnerWorkerChannel(worker_id=worker_id) - try: - if len(workers) == 0 and current_message_count >= 1: - metrics_utils.scale_up(actor_id) - logger.debug('METICS: ADDING WORKER SINCE THERE WERE NONE') - else: - logger.debug('METRICS: worker creation criteria not met') - except Exception as e: - logger.debug("METRICS - Error scaling up: {} - {} - {}".format(type(e), e, e.args)) + max_workers = None + if actor.get('max_workers'): + try: + max_workers = int(actor['max_workers']) + except Exception as e: + logger.error("max_workers defined for actor_id {} but could not cast to int. " + "Exception: {}".format(actor_id, e)) + if not max_workers: + try: + conf = Config.get('spawner', 'max_workers_per_actor') + max_workers = int(conf) + except Exception as e: + logger.error("Unable to get/cast max_workers_per_actor config ({}) to int. " + "Exception: {}".format(conf, e)) + max_workers = 1 - # Add a worker if message count reaches a given number + # Add an additional worker if message count reaches a given number try: logger.debug("METRICS current message count: {}".format(current_message_count)) if metrics_utils.allow_autoscaling(max_workers, len(workers)): if current_message_count >= 1: metrics_utils.scale_up(actor_id) logger.debug("METRICS current message count: {}".format(data[0]['value'][1])) - # changed - jfs: i think this block needs to run even if allow_autoscaling returns false - # so that scale down can work once message count reaches zero in case where the - # autoscaler previously scaled the worker pool to max_workers: + else: + logger.warning('METRICS - COMMAND QUEUE is getting full. Skipping autoscale.') if current_message_count == 0: metrics_utils.scale_down(actor_id) logger.debug("METRICS made it to scale down block") @@ -476,6 +474,95 @@ def delete(self, alias, nonce_id): return ok(result=None, msg="Alias nonce deleted successfully.") +def check_for_link_cycles(db_id, link_dbid): + """ + Check if a link from db_id -> link_dbid would not create a cycle among linked actors. + :param dbid: actor linking to link_dbid + :param link_dbid: id of actor being linked to. + :return: + """ + logger.debug("top of check_for_link_cycles; db_id: {}; link_dbid: {}".format(db_id, link_dbid)) + # create the links graph, resolving each link attribute to a db_id along the way: + # start with the passed in link, this is the "proposed" link - + links = {db_id: link_dbid} + for _, actor in actors_store.items(): + if actor.get('link'): + try: + link_id = Actor.get_actor_id(actor.get('tenant'), actor.get('link')) + link_dbid = Actor.get_dbid(g.tenant, link_id) + except Exception as e: + logger.error("corrupt link data; could not resolve link attribute in " + "actor: {}; exception: {}".format(actor, e)) + continue + # we do not want to override the proposed link passed in, as this actor could already have + # a link (that was valid) and we need to check that the proposed link still works + if not actor.get('db_id') == db_id: + links[actor.get('db_id')] = link_dbid + logger.debug("actor links dictionary built. links: {}".format(links)) + if has_cycles(links): + raise DAOError("Error: this update would result in a cycle of linked actors.") + + +def has_cycles(links): + """ + Checks whether the `links` dictionary contains a cycle. + :param links: dictionary of form d[k]=v where k->v is a link + :return: + """ + logger.debug("top of has_cycles. links: {}".format(links)) + # consider each link entry as the starting node: + for k, v in links.items(): + # list of visited nodes on this iteration; starts with the two links. + # if we visit a node twice, we have a cycle. + visited = [k, v] + # current node we are on + current = v + while current: + # look up current to see if it has a link: + current = links.get(current) + # if it had a link, check if it was alread in visited: + if current and current in visited: + return True + visited.append(current) + return False + + +def validate_link(args): + """ + Method to validate a request trying to set a link on an actor. Called for both POSTs (new actors) + and PUTs (updates to existing actors). + :param args: + :return: + """ + logger.debug("top of validate_link. args: {}".format(args)) + # check permissions - creating a link to an actor requires EXECUTE permissions + # on the linked actor. + try: + link_id = Actor.get_actor_id(g.tenant, args['link']) + link_dbid = Actor.get_dbid(g.tenant, link_id) + except Exception as e: + msg = "Invalid link parameter; unable to retrieve linked actor data. The link " \ + "must be a valid actor id or alias for which you have EXECUTE permission. " + logger.info("{}; exception: {}".format(msg, e)) + raise DAOError(msg) + try: + check_permissions(g.user, link_dbid, EXECUTE) + except Exception as e: + logger.info("Got exception trying to check permissions for actor link. " + "Exception: {}; link: {}".format(e, link_dbid)) + raise DAOError("Invalid link parameter. The link must be a valid " + "actor id or alias for which you have EXECUTE permission. " + "Additional info: {}".format(e)) + logger.debug("check_permissions passed.") + # POSTs to create new actors do not have db_id's assigned and cannot result in + # cycles + if not g.db_id: + logger.debug("returning from validate_link - no db_id") + return + if link_dbid == g.db_id: + raise DAOError("Invalid link parameter. An actor cannot link to itself.") + check_for_link_cycles(g.db_id, link_dbid) + class ActorsResource(Resource): def get(self): @@ -499,6 +586,8 @@ def validate_post(self): valid_queues = queues_list.split(',') if args['queue'] not in valid_queues: raise BadRequest('Invalid queue name.') + if args['link']: + validate_link(args) except BadRequest as e: msg = 'Unable to process the JSON description.' @@ -616,6 +705,8 @@ def put(self, actor_id): valid_queues = queues_list.split(',') if args['queue'] not in valid_queues: raise BadRequest('Invalid queue name.') + if args['link']: + validate_link(args) # user can force an update by setting the force param: update_image = args.get('force') if not update_image and args['image'] == previous_image: @@ -905,12 +996,14 @@ def delete(self, actor_id, execution_id): actor_id)) raise ResourceError("Execution not found {}.".format(execution_id)) # check status of execution: - if not exc.status == codes.SUBMITTED: - logger.debug("execution not in {} status: {}".format(codes.SUBMITTED, exc.status)) + if not exc.status == codes.RUNNING: + logger.debug("execution not in {} status: {}".format(codes.RUNNING, exc.status)) raise ResourceError("Cannot force quit an execution not in {} status. " - "Execution was found in status: {}".format(codes.SUBMITTED, exc.status)) + "Execution was found in status: {}".format(codes.RUNNING, exc.status)) # send force_quit message to worker: # TODO - should we set the execution status to FORCE_QUIT_REQUESTED? + logger.debug("issuing force quit to worker: {} " + "for actor_id: {} execution_id: {}".format(exc.worker_id, actor_id, execution_id)) ch = WorkerChannel(worker_id=exc.worker_id) ch.put('force_quit') msg = 'Issued force quit command for execution {}.'.format(execution_id) @@ -1071,10 +1164,12 @@ def validate_post(self): return args def post(self, actor_id): + start_timer = timeit.default_timer() + def get_hypermedia(actor, exc): return {'_links': {'self': '{}/actors/v2/{}/executions/{}'.format(actor.api_server, actor.id, exc), 'owner': '{}/profiles/v2/{}'.format(actor.api_server, actor.owner), - 'messages': '{}/actors/v2/{}/messages'.format(actor.api_server, actor.id)},} + 'messages': '{}/actors/v2/{}/messages'.format(actor.api_server, actor.id)}, } logger.debug("top of POST /actors/{}/messages.".format(actor_id)) synchronous = False @@ -1084,7 +1179,9 @@ def get_hypermedia(actor, exc): except KeyError: logger.debug("did not find actor: {}.".format(actor_id)) raise ResourceError("No actor found with id: {}.".format(actor_id), 404) + got_actor_timer = timeit.default_timer() args = self.validate_post() + val_post_timer = timeit.default_timer() d = {} # build a dictionary of k:v pairs from the query parameters, and pass a single # additional object 'message' from within the post payload. Note that 'message' @@ -1103,6 +1200,7 @@ def get_hypermedia(actor, exc): if k == 'message': continue d[k] = v + request_args_timer = timeit.default_timer() logger.debug("extra fields added to message from query parameters: {}.".format(d)) if synchronous: # actor mailbox length must be 0 to perform a synchronous execution @@ -1120,31 +1218,53 @@ def get_hypermedia(actor, exc): if hasattr(g, 'jwt_header_name'): d['_abaco_jwt_header_name'] = g.jwt_header_name logger.debug("abaco_jwt_header_name: {} added to message.".format(g.jwt_header_name)) - # create an execution + before_exc_timer = timeit.default_timer() exc = Execution.add_execution(dbid, {'cpu': 0, 'io': 0, 'runtime': 0, 'status': SUBMITTED, 'executor': g.user}) + after_exc_timer = timeit.default_timer() logger.info("Execution {} added for actor {}".format(exc, actor_id)) d['_abaco_execution_id'] = exc d['_abaco_Content_Type'] = args.get('_abaco_Content_Type', '') logger.debug("Final message dictionary: {}".format(d)) + before_ch_timer = timeit.default_timer() ch = ActorMsgChannel(actor_id=dbid) + after_ch_timer = timeit.default_timer() ch.put_msg(message=args['message'], d=d) + after_put_msg_timer = timeit.default_timer() ch.close() + after_ch_close_timer = timeit.default_timer() logger.debug("Message added to actor inbox. id: {}.".format(actor_id)) # make sure at least one worker is available actor = Actor.from_db(actors_store[dbid]) + after_get_actor_db_timer = timeit.default_timer() actor.ensure_one_worker() + after_ensure_one_worker_timer = timeit.default_timer() logger.debug("ensure_one_worker() called. id: {}.".format(actor_id)) if args.get('_abaco_Content_Type') == 'application/octet-stream': result = {'execution_id': exc, 'msg': 'binary - omitted'} else: - result={'execution_id': exc, 'msg': args['message']} + result = {'execution_id': exc, 'msg': args['message']} result.update(get_hypermedia(actor, exc)) case = Config.get('web', 'case') + end_timer = timeit.default_timer() + time_data = {'total': (end_timer - start_timer) * 1000, + 'get_actor': (got_actor_timer - start_timer) * 1000, + 'validate_post': (val_post_timer - got_actor_timer) * 1000, + 'parse_request_args': (request_args_timer - val_post_timer) * 1000, + 'create_msg_d': (before_exc_timer - request_args_timer) * 1000, + 'add_execution': (after_exc_timer - before_exc_timer) * 1000, + 'final_msg_d': (before_ch_timer - after_exc_timer) * 1000, + 'create_actor_ch': (after_ch_timer - before_ch_timer) * 1000, + 'put_msg_ch': (after_put_msg_timer - after_ch_timer) * 1000, + 'close_ch': (after_ch_close_timer - after_put_msg_timer) * 1000, + 'get_actor_2': (after_get_actor_db_timer - after_ch_close_timer) * 1000, + 'ensure_1_worker': (after_ensure_one_worker_timer - after_get_actor_db_timer) * 1000, + } + logger.info("Times to process message: {}".format(time_data)) if synchronous: return self.do_synch_message(exc) if not case == 'camel': @@ -1278,7 +1398,7 @@ def post(self, actor_id): # send num_to_add messages to add 1 worker so that messages are spread across multiple # spawners. worker_id = Worker.request_worker(tenant=g.tenant, - actor_id=dbid) + actor_id=dbid) logger.info("New worker id: {}".format(worker_id[0])) ch = CommandChannel(name=actor.queue) ch.put_cmd(actor_id=actor.db_id, diff --git a/actors/docker_utils.py b/actors/docker_utils.py index 51aac4c3..38fbc837 100644 --- a/actors/docker_utils.py +++ b/actors/docker_utils.py @@ -14,7 +14,7 @@ from channels import ExecutionResultsChannel from config import Config -from codes import BUSY, READY +from codes import BUSY, READY, RUNNING import globals from models import Execution, get_current_utc_time @@ -506,7 +506,8 @@ def execute_actor(actor_id, start_time = get_current_utc_time() # start the timer to track total execution time. start = timeit.default_timer() - logger.debug("right before cli.start: {}; (worker {};{})".format(start, worker_id, execution_id)) + logger.debug("right before cli.start: {}; container id: {}; " + "(worker {};{})".format(start, container.get('Id'), worker_id, execution_id)) try: cli.start(container=container.get('Id')) except Exception as e: @@ -516,6 +517,7 @@ def execute_actor(actor_id, # local bool tracking whether the actor container is still running running = True + Execution.update_status(actor_id, execution_id, RUNNING) logger.debug("right before creating stats_cli: {}; (worker {};{})".format(timeit.default_timer(), worker_id, execution_id)) diff --git a/actors/events.py b/actors/events.py new file mode 100644 index 00000000..94deb88f --- /dev/null +++ b/actors/events.py @@ -0,0 +1,146 @@ +""" +Create events. +Check for subscriptions (actor links) and publish events to the Events Exchange. + +new events: + +# create a new event object: +event = ActorEvent(tenant_id, actor_id, event_type, data) + +# handles all logic associated with publishing the event, including checking +event.publish() + +""" +import json +import os +import rabbitpy +import requests +import time +from agaveflask.auth import get_api_server + +from codes import SUBMITTED +from channels import ActorMsgChannel, EventsChannel +from models import Execution +from stores import actors_store + + +from agaveflask.logs import get_logger +logger = get_logger(__name__) + +def process_event_msg(msg): + """ + Process an event msg on an event queue + :param msg: + :return: + """ + logger.debug("top of process_event_msg; raw msg: {}".format(msg)) + try: + tenant_id = msg['tenant_id'] + except Exception as e: + logger.error("Missing tenant_id in event msg; exception: {}; msg: {}".format(e, msg)) + raise e + link = msg.get('_abaco_link') + webhook = msg.get('_abaco_webhook') + logger.debug("processing event data; " + "tenant_id: {}; link: {}; webhook: {}".format(tenant_id, link, webhook)) + # additional metadata about the execution + d = {} + d['_abaco_Content_Type'] = 'application/json' + d['_abaco_username'] = 'Abaco Event' + d['_abaco_api_server'] = get_api_server(tenant_id) + if link: + process_link(link, msg, d) + if webhook: + process_webhook(webhook, msg, d) + if not link and not webhook: + logger.error("No link or webhook. Ignoring event. msg: {}".format(msg)) + +def process_link(link, msg, d): + """ + Process an event with a link. + :return: + """ + # ensure that the linked actor still exists; the link attribute is *always* the dbid of the linked + # actor + logger.debug("top of process_link") + try: + actors_store[link] + except KeyError as e: + logger.error("Processing event message for actor {} that does not exist. Quiting".format(link)) + raise e + + # create an execution for the linked actor with message + exc = Execution.add_execution(link, {'cpu': 0, + 'io': 0, + 'runtime': 0, + 'status': SUBMITTED, + 'executor': 'Abaco Event'}) + logger.info("Events processor agent added execution {} for actor {}".format(exc, link)) + d['_abaco_execution_id'] = exc + logger.debug("sending message to actor. Final message {} and message dictionary: {}".format(msg, d)) + ch = ActorMsgChannel(actor_id=link) + ch.put_msg(message=msg, d=d) + ch.close() + logger.info("link processed.") + +def process_webhook(webhook, msg, d): + logger.debug("top of process_webhook") + msg.update(d) + try: + rsp = requests.post(webhook, json=msg) + rsp.raise_for_status() + logger.debug("webhook processed") + except Exception as e: + logger.error("Events got exception posting to webhook: " + "{}; exception: {}; event: {}".format(webhook, e, msg)) + + +def run(ch): + """ + Primary loop for events processor agent. + :param ch: + :return: + """ + while True: + logger.info("top of events processor while loop") + msg, msg_obj = ch.get_one() + try: + process_event_msg(msg) + except Exception as e: + msg = "Events processor get an exception trying to process a message. exception: {}; msg: {}".format(e, msg) + logger.error(msg) + # at this point, all messages are acked, even when there is an error processing + msg_obj.ack() + + +def main(): + """ + Entrypoint for events processor agent. + :return: + """ + # operator should pass the name of the events channel that this events agent should subscribe to. + # + ch_name = os.environ.get('events_ch_name') + + idx = 0 + while idx < 3: + try: + if ch_name: + ch = EventsChannel(name=ch_name) + else: + ch = EventsChannel() + logger.info("events processor made connection to rabbit, entering main loop") + logger.info("events processor using abaco_conf_host_path={}".format(os.environ.get('abaco_conf_host_path'))) + run(ch) + except (rabbitpy.exceptions.ConnectionException, RuntimeError): + # rabbit seems to take a few seconds to come up + time.sleep(5) + idx += 1 + logger.critical("events agent could not connect to rabbitMQ. Shutting down!") + +if __name__ == '__main__': + # This is the entry point for the events processor agent container. + logger.info("Inital log for events processor agent.") + + # call the main() function: + main() diff --git a/actors/health.py b/actors/health.py index 87d5658f..14329f37 100644 --- a/actors/health.py +++ b/actors/health.py @@ -377,6 +377,11 @@ def start_spawner(queue, idx='0'): environment.update({'AE_IMAGE': AE_IMAGE.split(':')[0], 'queue': queue, }) + if not '_abaco_secret' in environment: + msg = 'Error in health process trying to start spawner. Did not find an _abaco_secret. Aborting' + logger.critical(msg) + raise + # check logging strategy to determine log file name: log_file = 'abaco.log' if get_log_file_strategy() == 'split': diff --git a/actors/metrics_utils.py b/actors/metrics_utils.py index 17e241dc..0f4a5b56 100644 --- a/actors/metrics_utils.py +++ b/actors/metrics_utils.py @@ -25,24 +25,35 @@ ['name']) def create_gauges(actor_ids): - logger.debug("METRICS: Made it to create_gauges") + logger.debug("METRICS: Made it to create_gauges; actor_ids: {}".format(actor_ids)) for actor_id in actor_ids: + logger.debug("top of for loop for actor_id: {}".format(actor_id)) try: actor = actors_store[actor_id] + except KeyError: + logger.error("actor {} does not exist.".format(actor_id)) + continue # If the actor doesn't have a gauge, add one - if actor_id not in message_gauges.keys(): - - g = Gauge( - 'message_count_for_actor_{}'.format(actor_id.decode("utf-8").replace('-', '_')), - 'Number of messages for actor {}'.format(actor_id.decode("utf-8").replace('-', '_')) - ) - message_gauges.update({actor_id: g}) - logger.debug('Created gauge {}'.format(g)) - else: - # Otherwise, get this actor's existing gauge + if actor_id not in message_gauges.keys(): + try: + g = Gauge( + 'message_count_for_actor_{}'.format(actor_id.decode("utf-8").replace('-', '_')), + 'Number of messages for actor {}'.format(actor_id.decode("utf-8").replace('-', '_')) + ) + message_gauges.update({actor_id: g}) + logger.debug('Created gauge {}'.format(g)) + except Exception as e: + logger.error("got exception trying to create/instantiate the gauge; " + "actor {}; exception: {}".format(actor_id, e)) + else: + # Otherwise, get this actor's existing gauge + try: g = message_gauges[actor_id] + except Exception as e: + logger.info("got exception trying to instantiate an existing gauge; " + "actor: {}: exception:{}".format(actor_id, e)) # Update this actor's command channel metric channel_name = actor.get("queue") @@ -55,10 +66,8 @@ def create_gauges(actor_ids): ch = CommandChannel(name=channel_name) command_gauge.labels(channel_name).set(len(ch._queue._queue)) - logger.debug("METRICS COMMAND CHANNEL {} size: {}".format(channel_name, command_gauge._value._value)) + logger.debug("METRICS COMMAND CHANNEL {} size: {}".format(channel_name, command_gauge)) ch.close() - except Exception as e: - logger.info("got exception trying to instantiate the Gauge: {}".format(e)) # Update this actor's gauge to its current # of messages try: @@ -122,23 +131,22 @@ def calc_change_rate(data, last_metric, actor_id): return change_rate -def allow_autoscaling(cmd_q_len, max_workers, num_workers): +def allow_autoscaling(max_workers, num_workers): if int(num_workers) >= int(max_workers): - logger.debug('METRICS NO AUTOSCALE - criteria not met. {} {} '.format(cmd_q_len, num_workers)) + logger.debug('METRICS NO AUTOSCALE - criteria not met. {} '.format(num_workers)) return False - logger.debug('METRICS AUTOSCALE - criteria met. {} {} '.format(cmd_q_len, num_workers)) + logger.debug('METRICS AUTOSCALE - criteria met. {} '.format(num_workers)) return True - def scale_up(actor_id): tenant, aid = actor_id.decode('utf8').split('_') logger.debug('METRICS Attempting to create a new worker for {}'.format(actor_id)) try: # create a worker & add to this actor actor = Actor.from_db(actors_store[actor_id]) - worker_id = Worker.request_worker(tenant=tenant, actor_id=aid) + worker_id = Worker.request_worker(tenant=tenant, actor_id=actor_id) logger.info("New worker id: {}".format(worker_id)) if actor.queue: channel_name = actor.queue diff --git a/actors/models.py b/actors/models.py index 7b8ead2c..8ee1fc28 100644 --- a/actors/models.py +++ b/actors/models.py @@ -9,8 +9,8 @@ from agaveflask.utils import RequestParser -from channels import CommandChannel -from codes import REQUESTED, SUBMITTED, EXECUTE, PermissionLevel +from channels import CommandChannel, EventsChannel +from codes import REQUESTED, READY, ERROR, SUBMITTED, EXECUTE, PermissionLevel, SPAWNER_SETUP, PULLING_IMAGE, CREATING_CONTAINER, UPDATING_STORE, BUSY from config import Config import errors @@ -22,6 +22,9 @@ HASH_SALT = 'eJa5wZlEX4eWU' +# default max length for an actor execution log - 1MB +DEFAULT_MAX_LOG_LENGTH = 1000000 + def is_hashid(identifier): """ Determine if `identifier` is an Abaco Hashid (e.g., actor id, worker id, nonce id, etc. @@ -77,6 +80,117 @@ def display_time(t): return str(dt) +class Event(object): + """ + Base event class for all Abaco events. + """ + + def __init__(self, dbid: str, event_type: str, data: dict): + """ + Create a new even object + """ + self.db_id = dbid + self.tenant_id = dbid.split('_')[0] + self.actor_id = Actor.get_display_id(self.tenant_id, dbid) + self.event_type = event_type + data['tenant_id'] = self.tenant_id + data['event_type'] = event_type + data['event_time_utc'] = get_current_utc_time() + data['event_time_display'] = display_time(data['event_time_utc']) + data['actor_id'] = self.actor_id + data['actor_dbid'] = dbid + self._get_events_attrs() + data['_abaco_link'] = self.link + data['_abaco_webhook'] = self.webhook + data['event_type'] = event_type + self.data = data + + def _get_events_attrs(self) -> str: + """ + Check if the event is for an actor that has an event property defined (like, webhook, socket), + and return the actor db_id if so. Otherwise, returns an empty string. + :return: the db_id associated with the link, if it exists, or '' otherwise. + """ + try: + actor = Actor.from_db(actors_store[self.db_id]) + except KeyError: + logger.debug("did not find actor with id: {}".format(self.actor_id)) + raise errors.ResourceError("No actor found with identifier: {}.".format(self.actor_id), 404) + # the link and webhook attributes were added in 1.2.0; actors registered before 1.2.0 will not have + # have these attributed defined so we use the .get() method below -- + # if the webhook exists, we always try it. + self.webhook = actor.get('webhook') or '' + + # the actor link might not exist + link = actor.get('link') or '' + # if the actor link exists, it could be an actor id (not a dbid) or an alias. + # the following code resolves the link data to an actor dbid + if link: + try: + link_id = Actor.get_actor_id(self.tenant_id, link) + self.link = Actor.get_dbid(self.tenant_id, link_id) + except Exception as e: + self.link = '' + logger.error("Got exception: {} calling get_actor_id to set the event link.".format(e)) + else: + self.link = '' + + def publish(self): + """ + External API for publishing events; handles all logic associated with event publishing including + checking for actor links and pushing messages to the EventsExchange. + :return: None + """ + logger.debug("top of publish for event: {}".format(self.data)) + logger.info(self.data) + if not self.link and not self.webhook: + logger.debug("No link or webhook supplied for this event. Not publishing to the EventsChannel.") + return None + ch = EventsChannel() + ch.put_event(self.data) + + +class ActorEvent(Event): + """ + Data access object class for creating and working with actor event objects. + """ + event_types = ('ACTOR_READY', + 'ACTOR_ERROR', + ) + + def __init__(self, dbid: str, event_type: str, data: dict): + """ + Create a new even object + """ + super().__init__(dbid, event_type, data) + if not event_type.upper() in ActorEvent.event_types: + logger.error("Invalid actor event type passed to the ActorEvent constructor. " + "event type: {}".format(event_type)) + raise errors.DAOError("Invalid actor event type {}.".format(event_type)) + + +class ActorExecutionEvent(Event): + """ + Data access object class for creating and working with actor execution event objects. + """ + event_types = ('EXECUTION_STARTED', + 'EXECUTION_COMPLETE', + ) + + def __init__(self, dbid: str, execution_id: str, event_type: str, data: dict): + """ + Create a new even object + """ + super().__init__(dbid, event_type, data) + if not event_type.upper() in ActorExecutionEvent.event_types: + logger.error("Invalid actor event type passed to the ActorExecutionEvent constructor. " + "event type: {}".format(event_type)) + raise errors.DAOError("Invalid actor execution event type {}.".format(event_type)) + + self.execution_id = execution_id + self.data['execution_id'] = execution_id + + class DbDict(dict): """Class for persisting a Python dictionary.""" @@ -206,6 +320,8 @@ class Actor(AbacoDAO): ('stateless', 'optional', 'stateless', inputs.boolean, 'Whether the actor stores private state.', True), ('type', 'optional', 'type', str, 'Return type (none, bin, json) for this actor. Default is none.', 'none'), + ('link', 'optional', 'link', str, "Actor identifier of actor to link this actor's events too. May be an actor id or an alias. Cycles not permitted.", ''), + ('webhook', 'optional', 'webhook', str, "URL to publish this actor's events to.", ''), ('description', 'optional', 'description', str, 'Description of this actor', ''), ('privileged', 'optional', 'privileged', inputs.boolean, 'Whether this actor runs in privileged mode.', False), ('max_workers', 'optional', 'max_workers', int, 'How many workers this actor is allowed at the same time.', None), @@ -267,9 +383,9 @@ def get_derived_value(self, name, d): @classmethod def get_actor_id(cls, tenant, identifier): """ - Return the db_id associated with the identifier + Return the human readable actor_id associated with the identifier :param identifier (str): either an actor_id or an alias. - :return: The actor_id; rasies a KeyError if no actor suc exists. + :return: The actor_id; raises a KeyError if no actor exists. """ if is_hashid(identifier): return identifier @@ -365,6 +481,13 @@ def set_status(cls, actor_id, status, status_message=None): """Update the status of an actor""" logger.debug("top of set_status for status: {}".format(status)) actors_store.update(actor_id, 'status', status) + try: + event_type = 'ACTOR_{}'.format(status).upper() + event = ActorEvent(actor_id, event_type, {'status_message': status_message}) + event.publish() + except Exception as e: + logger.error("Got exception trying to publish an actor status event. " + "actor_id: {}; status: {}; exception: {}".format(actor_id, status, e)) if status_message: actors_store.update(actor_id, 'status_message', status_message) @@ -770,6 +893,25 @@ def add_worker_id(cls, actor_id, execution_id, worker_id): e, actor_id, execution_id, worker_id)) raise errors.ExecutionException("Execution {} not found.".format(execution_id)) + @classmethod + def update_status(cls, actor_id, execution_id, status): + """ + :param actor_id: the id of the actor + :param execution_id: the id of the execution + :param status: the new status of the execution. + :return: + """ + logger.debug("top of update_status() for actor: {} execution: {} status: {}".format( + actor_id, execution_id, status)) + try: + executions_store.update_subfield(actor_id, execution_id, 'status', status) + logger.debug("status updated for execution: {} actor: {}. New status: {}".format( + execution_id, actor_id, status)) + except KeyError as e: + logger.error("Could not update status. KeyError: {}. actor: {}. ex: {}. status: {}".format( + e, actor_id, execution_id, status)) + raise errors.ExecutionException("Execution {} not found.".format(execution_id)) + @classmethod def finalize_execution(cls, actor_id, execution_id, status, stats, final_state, exit_code, start_time): """ @@ -806,6 +948,21 @@ def finalize_execution(cls, actor_id, execution_id, status, stats, final_state, logger.error("Could not finalize execution. execution not found. Params: {}".format(params_str)) raise errors.ExecutionException("Execution {} not found.".format(execution_id)) + try: + event_type = 'EXECUTION_COMPLETE' + data = {'actor_id': actor_id, + 'execution_id': execution_id, + 'status': status, + 'exit_code': exit_code + } + event = ActorExecutionEvent(actor_id, execution_id, event_type, data) + event.publish() + except Exception as e: + logger.error("Got exception trying to publish an actor execution event. " + "actor_id: {}; execution_id: {}; status: {}; " + "exception: {}".format(actor_id, execution_id, status, e)) + + @classmethod def set_logs(cls, exc_id, logs): """ @@ -819,6 +976,13 @@ def set_logs(cls, exc_id, logs): log_ex = int(log_ex) except ValueError: log_ex = -1 + try: + max_log_length = int(Config.get('web', 'max_log_length')) + except: + max_log_length = DEFAULT_MAX_LOG_LENGTH + if len(logs) > DEFAULT_MAX_LOG_LENGTH: + logger.info("truncating log for execution: {}".format(exc_id)) + logs = logs[:max_log_length] if log_ex > 0: logger.info("Storing log with expiry. exc_id: {}".format(exc_id)) logs_store.set_with_expiry(exc_id, logs) @@ -1090,7 +1254,7 @@ def update_worker_health_time(cls, actor_id, worker_id): def update_worker_status(cls, actor_id, worker_id, status): """Pass db_id as `actor_id` parameter.""" logger.debug("LOOK HERE top of update_worker_status().") - # TODO add check for valid state transition - set correct ERROR + # The valid state transitions are as follows - set correct ERROR: # REQUESTED -> SPAWNER_SETUP # SPAWNER_SETUP -> PULLING_IMAGE # PULLING_IMAGE -> CREATING_CONTAINER @@ -1098,13 +1262,26 @@ def update_worker_status(cls, actor_id, worker_id, status): # UPDATING_STORE -> READY # READY -> BUSY -> READY ... etc - prev_status = workers_store[actor_id][worker_id]['status'] - - - worker = workers_store[actor_id][worker_id] - logger.info(f"LOOK HERE Worker status will be changed from {worker['status']} to {status}") + # workers can always transition to an ERROR status from any status and from an ERROR status to + # any status. + if status != ERROR and ERROR not in prev_status: + if prev_status == REQUESTED and status != SPAWNER_SETUP: + raise Exception(f"Invalid State Transition f{prev_status} -> f{status}") + elif prev_status == SPAWNER_SETUP and status != PULLING_IMAGE: + raise Exception(f"Invalid State Transition f{prev_status} -> f{status}") + elif prev_status == PULLING_IMAGE and status != CREATING_CONTAINER: + raise Exception(f"Invalid State Transition f{prev_status} -> f{status}") + elif prev_status == CREATING_CONTAINER and status != UPDATING_STORE: + raise Exception(f"Invalid State Transition f{prev_status} -> f{status}") + elif prev_status == UPDATING_STORE and status != READY: + raise Exception(f"Invalid State Transition f{prev_status} -> f{status}") + + if status == ERROR: + status = ERROR + f" PREVIOUS {prev_status}" + + logger.info(f"worker status will be changed from {prev_status} to {status}") # get worker's current status and then do V this separately # this is not threadsafe diff --git a/actors/queues.py b/actors/queues.py new file mode 100644 index 00000000..7bfe8e8a --- /dev/null +++ b/actors/queues.py @@ -0,0 +1,138 @@ +import cloudpickle +import json +import rabbitpy +import threading +import time + +from config import Config + + +class ChannelClosedException(Exception): + pass + + +class RabbitConnection(object): + def __init__(self, retries=100): + self._uri = Config.get('rabbit', 'uri') + tries = 0 + connected = False + while tries < retries and not connected: + try: + self._conn = rabbitpy.Connection(self._uri) + connected = True + except RuntimeError: + time.sleep(0.1) + if not connected: + raise RuntimeError("Could not connect to RabbitMQ.") + self._ch = self._conn.channel() + self._ch.prefetch_count(value=1, all_channels=True) + + def close(self): + """Close this instance of the connection. """ + + self._ch.close() + self._conn.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + del exc_type, exc_val, exc_tb + self.close() + +# rconn = RabbitConnection() + +class LegacyQueue(object): + """ + This class is here to support existing code that expects an _queue object on the Various channel objects (e.g., + ActorMsgChannel object). Client code that makes use of the _queue._queue + """ + pass + + +class TaskQueue(object): + def __init__(self, name=None): + # reuse the singleton rconn + # self.conn = rconn + # NOTE - + # reusing the singleton creates the issue that, once closed, it cannot be used further. in order for the + # singleton pattern to work, we would need fof individual functions to not call close() directly, but rather + # have an automated way at the end of each process/thread execution to close the connection. + + # create a new RabbitConnection for this instance of the task queue. + self.conn = RabbitConnection() + self._ch = self.conn._ch + self.name = name + self.queue = rabbitpy.Queue(self._ch, name=name, durable=True) + self.queue.declare() + # the following added for backwards compatibility so that client code using the ch._queue._queue attribute + # will continue to work. + self._queue = LegacyQueue() + self._queue._queue = self.queue + + @staticmethod + def _pre_process(msg): + """ + Processing that should happen before putting a message on the queue. Override in subclass. + """ + return msg + + @staticmethod + def _post_process(msg): + """ + Processing that should happen after retrieving a message from the queue. Override in subclass. + :param msg: + :return: + """ + return msg + + def put(self, m): + msg = rabbitpy.Message(self.conn._ch, self._pre_process(m), {}) + msg.publish('', self.name) + + # def close(self): + # self.conn.close() + + def close(self): + def _close(this): + this.conn.close() + + t = threading.Thread(target=_close, args=(self,)) + t.start() + + def delete(self): + self.conn.delete() + + def get_one(self): + """Blocking method to get a single message without polling.""" + if self._queue is None: + raise ChannelClosedException() + for msg in self.queue.consume(prefetch=1): + return self._post_process(msg), msg + + +class JsonTaskQueue(TaskQueue): + """ + Task Queue where the message payloads are all JSON + """ + + @staticmethod + def _pre_process(msg): + return json.dumps(msg).encode('utf-8') + + @staticmethod + def _post_process(msg): + return json.loads(msg.body.decode('utf-8'), object_hook=str) + + +class BinaryTaskQueue(TaskQueue): + """ + Task Queue where the message payloads are python objects. + """ + @staticmethod + def _pre_process(msg): + return cloudpickle.dumps(msg) + + @staticmethod + def _post_process(msg): + return cloudpickle.loads(msg.body) diff --git a/actors/spawner.py b/actors/spawner.py index 796ebf6a..f02fb6f6 100644 --- a/actors/spawner.py +++ b/actors/spawner.py @@ -11,7 +11,7 @@ from docker_utils import DockerError, run_worker, pull_image from errors import WorkerException from models import Actor, Worker -from stores import workers_store +from stores import actors_store, workers_store from channels import ActorMsgChannel, ClientsChannel, CommandChannel, WorkerChannel, SpawnerWorkerChannel from health import get_worker @@ -118,23 +118,21 @@ def stop_workers(self, actor_id, worker_ids): logger.info("No workers to stop.") def process(self, cmd): - logger.info("LOOK HERE - starting spawner process") """Main spawner method for processing a command from the CommandChannel.""" - logger.info("LOOK HERE - Spawner processing new command:{}".format(cmd)) + logger.info("top of process; cmd: {}".format(cmd)) actor_id = cmd['actor_id'] worker_id = cmd['worker_id'] image = cmd['image'] tenant = cmd['tenant'] stop_existing = cmd.get('stop_existing', True) num_workers = 1 - logger.info("LOOK HERE command params: actor_id: {} worker_id: {} image: {} tenant: {}" - "stop_existing: {} num_workers: {}".format(actor_id, worker_id, image, tenant, stop_existing, num_workers)) - - logger.info("LOOK HERE - getting worker") + logger.debug("spawner command params: actor_id: {} worker_id: {} image: {} tenant: {}" + "stop_existing: {} num_workers: {}".format(actor_id, worker_id, + image, tenant, stop_existing, num_workers)) # Status: REQUESTED -> SPAWNER_SETUP Worker.update_worker_status(actor_id, worker_id, SPAWNER_SETUP) - logger.info("LOOK HERE - worker status updated to SPAWNER_SETUP") + logger.debug("spawner has updated worker status to SPAWNER_SETUP; worker_id: {}".format(worker_id)) client_id = None client_secret = None client_access_token = None @@ -145,7 +143,7 @@ def process(self, cmd): # First, get oauth clients for the worker generate_clients = Config.get('workers', 'generate_clients').lower() if generate_clients == "true": - logger.info("LOOK HERE - starting client generation") + logger.debug("spawner starting client generation") client_id, \ client_access_token, \ @@ -155,7 +153,7 @@ def process(self, cmd): ch = SpawnerWorkerChannel(worker_id=worker_id) - logger.info("LOOK HERE - attempting to start worker") + logger.debug("spawner attempting to start worker; worker_id: {}".format(worker_id)) try: worker = self.start_worker( image, @@ -317,6 +315,19 @@ def start_worker(self, break logger.info('LOOK HERE - finished loop') worker_dict['ch_name'] = WorkerChannel.get_name(worker_id) + # if the actor is not already in READY status, set actor status to READY before worker status has been + # set to READY. + # it is possible the actor status is already READY because this request is the autoscaler starting a new worker + # for an existing actor. + actor = Actor.from_db(actors_store[actor_id]) + if not actor.status == READY: + try: + Actor.set_status(actor_id, READY, status_message=" ") + except KeyError: + # it is possible the actor was already deleted during worker start up; if + # so, the worker should have a stop message waiting for it. starting subscribe + # as usual should allow this process to work as expected. + pass # finalize worker with READY status worker = Worker(tenant=tenant, **worker_dict) logger.info("calling add_worker for worker: {}.".format(worker)) diff --git a/actors/worker.py b/actors/worker.py index f7dc1762..fb843f97 100644 --- a/actors/worker.py +++ b/actors/worker.py @@ -27,6 +27,9 @@ # required. keep_running = True +# maximum number of consecutive errors a worker can encounter before giving up and moving itself into an ERROR state. +MAX_WORKER_CONSECUTIVE_ERRORS = 5 + def shutdown_worker(worker_id, delete_actor_ch=True): """Gracefully shutdown a single worker.""" logger.debug("top of shutdown_worker for worker_id: {}".format(worker_id)) @@ -149,7 +152,7 @@ def subscribe(tenant, containers when message arrive. Also subscribes to the worker channel for future communications. :return: """ - logger.debug("Top of subscribe().") + logger.debug("Top of subscribe(). worker_id: {}".format(worker_id)) actor_ch = ActorMsgChannel(actor_id) logger.info(f"LOOK HERE - made it to subscribe - actor ID: {actor_id}") try: @@ -191,7 +194,7 @@ def subscribe(tenant, logger.info("Starting the process worker channel thread.") t = threading.Thread(target=process_worker_ch, args=(tenant, worker_ch, actor_id, worker_id, actor_ch, ag)) t.start() - logger.info("Worker subscribing to actor channel.") + logger.info("Worker subscribing to actor channel. worker_id: {}".format(worker_id)) logger.info("LOOK HERE - starting subscribe process") # keep track of whether we need to update the worker's status back to READY; otherwise, we # will hit redis with an UPDATE every time the subscription loop times out (i.e., every 2s) @@ -200,24 +203,26 @@ def subscribe(tenant, # global tracks whether this worker should keep running. globals.keep_running = True + # consecutive_errors tracks the number of consecutive times a worker has gotten an error trying to process a + # message. Even though the message will be requeued, we do not want the worker to continue processing + # indefinitely when a compute node is unhealthy. + consecutive_errors = 0 + # main subscription loop -- processing messages from actor's mailbox while globals.keep_running: - logger.info("LOOK HERE - made it to keep_running") + logger.debug("top of keep_running; worker id: {}".format(worker_id)) if update_worker_status: Worker.update_worker_status(actor_id, worker_id, READY) - logger.info("LOOK HERE - updated worker status to READY in SUBSCRIBE") + logger.debug("updated worker status to READY in SUBSCRIBE; worker id: {}".format(worker_id)) update_worker_status = False try: msg, msg_obj = actor_ch.get_one() - logger.info("LOOK HERE - made it to 206") except channelpy.ChannelClosedException: - logger.info("LOOK HERE - EXITING ") - logger.info("Channel closed, worker exiting...") + logger.info("Channel closed, worker exiting. worker id: {}".format(worker_id)) globals.keep_running = False sys.exit() logger.info("worker {} processing new msg.".format(worker_id)) - logger.info("LOOK HERE - made it to 212") try: Worker.update_worker_status(actor_id, worker_id, BUSY) except Exception as e: @@ -226,10 +231,11 @@ def subscribe(tenant, worker_id, BUSY, e)) + logger.info("worker exiting. worker_id: {}".format(worker_id)) msg_obj.nack(requeue=True) raise e update_worker_status = True - logger.info("Received message {}. Starting actor container...".format(msg)) + logger.info("Received message {}. Starting actor container. worker id: {}".format(msg, worker_id)) # the msg object is a dictionary with an entry called message and an arbitrary # set of k:v pairs coming in from the query parameters. message = msg.pop('message', '') @@ -246,6 +252,7 @@ def subscribe(tenant, BUSY, e)) msg_obj.nack(requeue=True) + logger.info("worker exiting. worker_id: {}".format(worker_id)) raise e # for results, create a socket in the configured directory. @@ -255,6 +262,7 @@ def subscribe(tenant, logger.error("No socket_host_path configured. Cannot manage results data. Nacking message") Actor.set_status(actor_id, ERROR, status_message="Abaco instance not configured for results data.") msg_obj.nack(requeue=True) + logger.info("worker exiting. worker_id: {}".format(worker_id)) raise e socket_host_path = '{}.sock'.format(os.path.join(socket_host_path_dir, worker_id, execution_id)) logger.info("Create socket at path: {}".format(socket_host_path)) @@ -272,6 +280,7 @@ def subscribe(tenant, logger.error("No fifo_host_path configured. Cannot manage binary data.") Actor.set_status(actor_id, ERROR, status_message="Abaco instance not configured for binary data. Nacking message.") msg_obj.nack(requeue=True) + logger.info("worker exiting. worker_id: {}".format(worker_id)) raise e fifo_host_path = os.path.join(fifo_host_path_dir, worker_id, execution_id) try: @@ -280,6 +289,7 @@ def subscribe(tenant, except Exception as e: logger.error("Could not create fifo_path. Nacking message. Exception: {}".format(e)) msg_obj.nack(requeue=True) + logger.info("worker exiting. worker_id: {}".format(worker_id)) raise e # add the fifo as a mount: mounts.append({'host_path': fifo_host_path, @@ -288,19 +298,20 @@ def subscribe(tenant, # the execution object was created by the controller, but we need to add the worker id to it now that we # know which worker will be working on the execution. - logger.debug("Adding worker_id to execution.") + logger.debug("Adding worker_id to execution. woker_id: {}".format(worker_id)) try: Execution.add_worker_id(actor_id, execution_id, worker_id) except Exception as e: logger.error("Unexpected exception adding working_id to the Execution. Nacking message. Exception: {}".format(e)) msg_obj.nack(requeue=True) + logger.info("worker exiting. worker_id: {}".format(worker_id)) raise e # privileged dictates whether the actor container runs in privileged mode and if docker daemon is mounted. privileged = False if type(actor['privileged']) == bool and actor['privileged']: privileged = True - logger.debug("privileged: {}".format(privileged)) + logger.debug("privileged: {}; worker_id: {}".format(privileged, worker_id)) # overlay resource limits if set on actor: if actor.mem_limit: @@ -325,7 +336,7 @@ def subscribe(tenant, environment['_abaco_container_repo'] = actor.image environment['_abaco_actor_state'] = actor.state environment['_abaco_actor_name'] = actor.name or 'None' - logger.debug("Overlayed environment: {}".format(environment)) + logger.debug("Overlayed environment: {}; worker_id: {}".format(environment, worker_id)) # if we have an agave client, get a fresh set of tokens: if ag: @@ -338,12 +349,12 @@ def subscribe(tenant, logger.error("Got an exception trying to get an access token. Stoping worker and nacking message. " "Exception: {}".format(e)) msg_obj.nack(requeue=True) + logger.info("worker exiting. worker_id: {}".format(worker_id)) raise e - else: - logger.info("Agave client `ag` is None -- not passing access token.") + logger.info("Agave client `ag` is None -- not passing access token; worker_id: {}".format(worker_id)) logger.info("Passing update environment: {}".format(environment)) - logger.info("LOOK HERE - about to execute actor") + logger.info("About to execute actor; worker_id: {}".format(worker_id)) try: stats, logs, final_state, exit_code, start_time = execute_actor(actor_id, worker_id, @@ -362,12 +373,24 @@ def subscribe(tenant, except DockerStartContainerError as e: logger.error("Worker {} got DockerStartContainerError: {} trying to start actor for execution {}." "Placing message back on queue.".format(worker_id, e, execution_id)) - # if we failed to start the actor container, we leave the worker up and re-queue the original - # message; NOTE - we use the "low level" put() instead of put_message() because we have the - # exact message we want to place in the queue; put_message is used by the controller to + # if we failed to start the actor container, we leave the worker up and re-queue the original message msg_obj.nack(requeue=True) logger.debug('message requeued.') - continue + consecutive_errors += 1 + if consecutive_errors > MAX_WORKER_CONSECUTIVE_ERRORS: + logger.error("Worker {} failed to successfully start actor for execution {} {} consecutive times; " + "Exception: {}. Putting the actor in error status and shutting " + "down workers.".format(worker_id, execution_id, MAX_WORKER_CONSECUTIVE_ERRORS, e)) + Actor.set_status(actor_id, ERROR, "Error executing container: {}; w".format(e)) + shutdown_workers(actor_id, delete_actor_ch=False) + # wait for worker to be shutdown.. + time.sleep(60) + break + else: + # sleep five seconds before getting a message again to give time for the compute + # node and/or docker health to recover + time.sleep(5) + continue except DockerStopContainerError as e: logger.error("Worker {} was not able to stop actor for execution: {}; Exception: {}. " "Putting the actor in error status and shutting down workers.".format(worker_id, execution_id, e)) @@ -377,7 +400,7 @@ def subscribe(tenant, msg_obj.ack() shutdown_workers(actor_id, delete_actor_ch=False) # wait for worker to be shutdown.. - time.sleep(600) + time.sleep(60) break except Exception as e: logger.error("Worker {} got an unexpected exception trying to run actor for execution: {}." @@ -390,23 +413,29 @@ def subscribe(tenant, msg_obj.ack() shutdown_workers(actor_id, delete_actor_ch=False) # wait for worker to be shutdown.. - time.sleep(600) + time.sleep(60) break # ack the message msg_obj.ack() - logger.info("LOOK HERE - container finished successfully ") + logger.debug("container finished successfully; worker_id: {}".format(worker_id)) # Add the completed stats to the execution logger.info("Actor container finished successfully. Got stats object:{}".format(str(stats))) Execution.finalize_execution(actor_id, execution_id, COMPLETE, stats, final_state, exit_code, start_time) - logger.info("Added execution: {}".format(execution_id)) + logger.info("Added execution: {}; worker_id: {}".format(execution_id, worker_id)) # Add the logs to the execution - Execution.set_logs(execution_id, logs) - logger.info("Added execution logs.") + try: + Execution.set_logs(execution_id, logs) + logger.debug("Successfully added execution logs.") + except Exception as e: + msg = "Got exception trying to set logs for exception {}; " \ + "Exception: {}; worker_id: {}".format(execution_id, e, worker_id) + logger.error(msg) # Update the worker's last updated and last execution fields: try: Worker.update_worker_execution_time(actor_id, worker_id) + logger.debug("worker execution time updated. worker_id: {}".format(worker_id)) except KeyError: # it is possible that this worker was sent a gracful shutdown command in the other thread # and that spawner has already removed this worker from the store. @@ -415,7 +444,10 @@ def subscribe(tenant, if globals.keep_running: logger.error("worker couldn't update's its execution time but keep_running is still true!") - logger.info("worker time stamps updated.") + # we completed an execution successfully; reset the consecutive_errors counter + consecutive_errors = 0 + logger.info("worker time stamps updated; worker_id: {}".format(worker_id)) + logger.info("global.keep_running no longer true. worker is now exited. worker id: {}".format(worker_id)) def get_container_user(actor): logger.debug("top of get_container_user") @@ -475,13 +507,6 @@ def main(): logger.info("Got a client.") # TODO - list all client vars - try: - Actor.set_status(actor_id, READY, status_message=" ") - except KeyError: - # it is possible the actor was already deleted during worker start up; if - # so, the worker should have a stop message waiting for it. starting subscribe - # as usual should allow this process to work as expected. - pass logger.info('LOOK HERE - updated actor status') logger.info("Actor status set to READY. subscribing to inbox.") @@ -505,4 +530,14 @@ def main(): logger.info("Inital log for new worker.") # call the main() function: - main() + try: + main() + except Exception as e: + try: + worker_id = os.environ.get('worker_id') + except: + worker_id = '' + msg = "worker caught exception from main loop. worker exiting. e" \ + "xception: {} worker_id: {}".format(e, worker_id) + logger.info(msg) + diff --git a/ansible/prod/hosts b/ansible/prod/hosts index afb739ae..70a003c8 100644 --- a/ansible/prod/hosts +++ b/ansible/prod/hosts @@ -1,18 +1,18 @@ [db] -# ab-core-prod-vmware ansible_ssh_host=129.114.60.250 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root +# ab-core-prod-vmware ansible_ssh_host=129.114.103.16 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root ab-compute1-tacc-prod-js ansible_ssh_host=129.114.104.9 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root ab-compute1-tacc-prod-rodeo ansible_ssh_host=129.114.7.16 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root - +ab-compute1-vmware ansible_ssh_host= [compute] -# ab-core-prod-vmware ansible_ssh_host=129.114.60.250 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root spawner_host_ip=172.17.0.1 spawner_host_id=0 +# ab-core-prod-vmware ansible_ssh_host=129.114.103.16 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root spawner_host_ip=172.17.0.1 spawner_host_id=0 # abaco02.tacc.utexas.edu ab-compute1-tacc-prod-js ansible_ssh_host=129.114.17.5 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root spawner_host_ip=172.17.0.1 spawner_host_id=2 ab-compute3-tacc-prod-js ansible_ssh_host=129.114.104.9 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root spawner_host_ip=172.17.0.1 spawner_host_id=0 [web] -# ab-web-prod-vmware ansible_ssh_host=129.114.60.211 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root +# ab-web-prod-vmware ansible_ssh_host=129.114.103.15 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root # abaco01.tacc.utexas.edu ab-web-tacc-prod-js ansible_ssh_host=129.114.18.5 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root ab-web-tacc-prod-rodeo ansible_ssh_host=129.114.7.16 ansible_ssh_private_key_file=~/.ssh/id_rsa ansible_ssh_user=root diff --git a/docker-compose.yml b/docker-compose.yml index 60c728a3..30a8edae 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,96 +1,298 @@ ---- - -version: "2" +version: "3.3" networks: abaco: driver: bridge services: + mongo: - extends: - file: docker-compose-local-db.yml - service: mongo + image: mongo + ports: + - "27017:27017" + # uncomment to add auth + # command: --auth networks: - abaco + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 - redis: - extends: - file: docker-compose-local-db.yml - service: redis + rabbit: + image: rabbitmq:3.5.3-management + ports: + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_NODENAME: abaco-rabbit + RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "+A 128" + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 networks: - abaco + depends_on: + - mongo + - redis - rabbit: - extends: - file: docker-compose-local-db.yml - service: rabbit + redis: + image: redis + ports: + - "6379:6379" networks: - abaco + # -- uncomment to add auth + # volumes: + # - ./redis.conf:/etc/redis.conf + # command: redis-server /etc/redis.conf --appendonly yes + # -- + sysctls: + net.core.somaxconn: '511' + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 nginx: - extends: - file: docker-compose-local.yml - service: nginx + image: abaco/nginx$TAG + volumes: + - ./local-dev.conf:/etc/service.conf +# - ./images/nginx/nginx.conf:/etc/nginx/nginx.conf +# - ./images/nginx/sites-enabled:/etc/nginx/sites-enabled + ports: + - "8000:80" + restart: always + depends_on: + - mongo + - redis networks: - abaco reg: - extends: - file: docker-compose-local.yml - service: reg + image: abaco/core$TAG + ports: + - "5000:5000" + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: + api: reg + server: gunicorn + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + depends_on: + - mongo + - redis networks: - abaco mes: - extends: - file: docker-compose-local.yml - service: mes + image: abaco/core$TAG + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + ports: + - "5001:5000" + environment: + server: gunicorn + api: mes + threads: 12 + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + depends_on: + - mongo + - redis networks: - abaco - metrics: - extends: - file: docker-compose-local.yml - service: metrics + admin: + image: abaco/core$TAG + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + ports: + - "5003:5000" + environment: + server: gunicorn + api: admin + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + depends_on: + - mongo + - redis networks: - abaco - admin: - extends: - file: docker-compose-local.yml - service: admin + spawner: + image: abaco/core$TAG + command: "python3 -u /actors/spawner.py" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: + abaco_conf_host_path: ${abaco_path}/local-dev.conf + _abaco_secret: 123 + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + queue: default + depends_on: + - mongo + - redis networks: - abaco clientg: - extends: - file: docker-compose-local.yml - service: clientg + image: abaco/core$TAG + command: "python3 -u /actors/clients.py" volumes: + - /var/run/docker.sock:/var/run/docker.sock - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + + environment: + abaco_conf_host_path: ${abaco_path}/local-dev.conf + _abaco_secret: 123 + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + + # add the following pair of credentials for each tenant wanting client generation + _abaco_DEV-DEVELOP_username: testotheruser + _abaco_DEV-DEVELOP_password: testotheruser + _abaco_DEV-STAGING_username: testotheruser + _abaco_DEV-STAGING_password: testotheruser + depends_on: + - mongo + - redis networks: - abaco - default_spawner: - extends: - file: docker-compose-local.yml - service: spawner + health: + image: abaco/core$TAG + command: /actors/health_check.sh + volumes: + - /:/host + - /var/run/docker.sock:/var/run/docker.sock + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: + abaco_conf_host_path: ${abaco_path}/local-dev.conf + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + # add the following pair of credentials for each tenant wanting client generation + _abaco_DEV-DEVELOP_username: testotheruser + _abaco_DEV-DEVELOP_password: testotheruser + _abaco_DEV-STAGING_username: testotheruser + _abaco_DEV-STAGING_password: testotheruser + depends_on: + - mongo + - redis networks: - abaco - special_spawner: - extends: - file: docker-compose-local.yml - service: spawner + events: + image: abaco/core$TAG + command: "python3 -u /actors/events.py" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + environment: - queue: special + abaco_conf_host_path: ${abaco_path}/local-dev.conf + _abaco_secret: 123 + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + + # add the following pair of credentials for each tenant wanting client generation + _abaco_DEV-DEVELOP_username: testotheruser + _abaco_DEV-DEVELOP_password: testotheruser + _abaco_DEV-STAGING_username: testotheruser + _abaco_DEV-STAGING_password: testotheruser + depends_on: + - mongo + - redis networks: - abaco - health: - extends: - file: docker-compose-local.yml - service: health + + prometheus: + # build: ./prometheus + image: abaco/prom$TAG + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + - ./prometheus/alert.rules.yml:/etc/prometheus/alert.rules.yml + command: + - '--config.file=/etc/prometheus/prometheus.yml' + # - '-storage.local.path=/prometheus' + ports: + - 9090:9090 networks: - abaco + depends_on: + - mongo + - redis + + grafana: + image: grafana/grafana + user: "104" + depends_on: + - prometheus + - mongo + - redis + ports: + - 3000:3000 + volumes: + - grafana_data:/var/lib/grafana + - ./prometheus/grafana/provisioning/:/etc/grafana/provisioning/ + env_file: + - ./prometheus/grafana/config.monitoring + networks: +# - back-tier +# - front-tier + - abaco + restart: always + + + metrics: + image: abaco/core$TAG + networks: + - abaco + volumes: + - ./local-dev.conf:/etc/service.conf + - ./abaco.log:/var/log/service.log + ports: + - "5004:5000" + environment: + server: dev + api: metrics + mongo_password: + redis_password: + TAS_ROLE_ACCT: + TAS_ROLE_PASS: + depends_on: + - mongo + - redis + - prometheus + +volumes: + grafana_data: {} \ No newline at end of file diff --git a/docs/developer_docs.md b/docs/developer_docs.md index 2f25e794..afa5713b 100644 --- a/docs/developer_docs.md +++ b/docs/developer_docs.md @@ -137,6 +137,14 @@ To run the functional tests, execute the following: $ docker run --network=abaco_abaco -e base_url=http://nginx -e case=camel -v /:/host -v $(pwd)/local-dev.conf:/etc/service.conf -it --rm abaco/testsuite$TAG ``` +You can pass arguments to the testsuite container just like you would to py.test. For example, you can run +just one test (in this case, the `test_create_actor_with_webhook` test) from within the `test_abaco_core.py` +file with: + +```shell +$ docker run --network=abaco_copy_abaco -e base_url=http://nginx -e case=camel -v /:/host -v $(pwd)/local-dev.conf:/etc/service.conf -it --rm abaco/testsuite$TAG /tests/test_abaco_core.py::test_create_actor_with_webhook +``` + Run the unit tests with a command similar to the following, changing the test module as the end as necessary: ```shell diff --git a/entry.sh b/entry.sh index 08e3ddea..f2a1ce27 100644 --- a/entry.sh +++ b/entry.sh @@ -4,25 +4,25 @@ if [ $api = "reg" ]; then if [ $server = "dev" ]; then python3 -u /actors/reg_api.py else - cd /actors; /usr/bin/gunicorn -w 2 -b :5000 reg_api:app + cd /actors; /usr/bin/gunicorn -w $threads -b :5000 reg_api:app fi elif [ $api = "admin" ]; then if [ $server = "dev" ]; then python3 -u /actors/admin_api.py else - cd /actors; /usr/bin/gunicorn -w 2 -b :5000 admin_api:app + cd /actors; /usr/bin/gunicorn -w $threads -b :5000 admin_api:app fi elif [ $api = "metrics" ]; then if [ $server = "dev" ]; then python3 -u /actors/metrics_api.py else - cd /actors; /usr/bin/gunicorn -w 2 -b :5000 metrics_api:app + cd /actors; /usr/bin/gunicorn -w $threads -b :5000 metrics_api:app fi elif [ $api = "mes" ]; then if [ $server = "dev" ]; then python3 -u /actors/message_api.py else - cd /actors; /usr/bin/gunicorn -w 2 -b :5000 message_api:app + cd /actors; /usr/bin/gunicorn -w $threads -b :5000 message_api:app fi fi diff --git a/images/nginx/nginx.conf b/images/nginx/nginx.conf index efc49fa0..66d10b74 100644 --- a/images/nginx/nginx.conf +++ b/images/nginx/nginx.conf @@ -1,6 +1,5 @@ #user nobody; -worker_processes 1; #error_log logs/error.log; #error_log logs/error.log notice; @@ -9,8 +8,10 @@ worker_processes 1; #pid logs/nginx.pid; +worker_processes 5; + events { - worker_connections 1024; + worker_connections 4096; } diff --git a/images/nginx/sites-enabled/flask-project b/images/nginx/sites-enabled/flask-project index f9c8afcc..a339cdf6 100644 --- a/images/nginx/sites-enabled/flask-project +++ b/images/nginx/sites-enabled/flask-project @@ -9,10 +9,6 @@ server { root /; } - location /metrics { - proxy_pass http://metrics:5000/metrics; - } - location ~* ^/actors/admin(.*) { proxy_pass http://admin:5000/actors/admin$1$is_args$args; } diff --git a/local-dev.conf b/local-dev.conf index 3b303ba6..de925190 100644 --- a/local-dev.conf +++ b/local-dev.conf @@ -89,6 +89,9 @@ dd: unix://var/run/docker.sock # number of worker containers to initially start when an actor is created init_count: 1 +# set whether autoscaling is enabled +autoscaling = false + # max length of time, in seconds, an actor container is allowed to execute before being killed. # set to -1 for indefinite execution time. max_run_time: -1 @@ -107,11 +110,10 @@ max_run_time: -1 # allow access to 1 cpu max_cpus = 1000000000 - # length of time, in seconds, to keep an idle worker alive. Set to -1 to keep workers # alive indefinitely. -# Set the ttl to 10 minutes. TODO -# worker_ttl: 600 +# Set the ttl to 2 minutes. +# worker_ttl: 120 # Set the ttl to 24 hours. worker_ttl: 86400 @@ -209,6 +211,11 @@ show_traceback: false # Here we set the to 12 hours. log_ex: 43200 +# Max length (in bytes) to store an actor execution's log. If a log exceeds this length, the log will be truncated. +# Note: max_log_length must not exceed the maximum document length for the log store. +# here we default it to 1 MB +max_log_length: 1000000 + # Either camel or snake: Whether to return responses in camel case or snake. Default is snake. case: camel @@ -217,4 +224,4 @@ case: camel max_content_length: 500000000 # list of all allowable queues -all_queues: default, special \ No newline at end of file +all_queues: default, special diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index cdbf8168..c363cddc 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -1,4 +1,3 @@ -# my global config global: scrape_interval: 15s # By default, scrape targets every 15 seconds. evaluation_interval: 15s # By default, scrape targets every 15 seconds. @@ -26,22 +25,11 @@ scrape_configs: # metrics_path defaults to '/metrics' # scheme defaults to 'http'. static_configs: - - targets: ['172.17.0.1:9090'] + - targets: ['prometheus:9090'] - - job_name: "abaco" + - job_name: 'abaco' scrape_interval: 5s static_configs: - - targets: ['172.17.0.1:5004'] + - targets: ['metrics:5000'] labels: - group: 'abaco-test' -# dns_sd_configs: -# - names: ['localhost'] -# port: 8000 -# type: SRV -# refresh_interval: 5s - -alerting: - alertmanagers: - - static_configs: - - targets: - - alertmanager:9093 \ No newline at end of file + group: 'abaco' diff --git a/samples/logger/Dockerfile b/samples/logger/Dockerfile new file mode 100644 index 00000000..66e964e8 --- /dev/null +++ b/samples/logger/Dockerfile @@ -0,0 +1,7 @@ +# Image: abacosamples/logger +# Test image that logs a long string of data. + +from abacosamples/py3_base +ADD logger.py /logger.py + +CMD ["python", "/logger.py"] \ No newline at end of file diff --git a/samples/logger/README.md b/samples/logger/README.md new file mode 100644 index 00000000..8bd8e9fa --- /dev/null +++ b/samples/logger/README.md @@ -0,0 +1,19 @@ +# Logger Sample # +# Image: abacosamples/logger + +This sample prints a long string to test Abaco's logging facility. The length of the string can be controlled by a +parameter sent via the message. + + +# Example Usage # +Use the default length (1 MB) +```bash +$ curl -H "x-jwt-assertion-DEV-DEVELOP: $jwt" localhost:8000/actors/$aid/messages?PYTHONUNBUFFERED=0 +``` + +Specify the a message length of 500K +```bash +$ curl -H "x-jwt-assertion-DEV-DEVELOP: $jwt" localhost:8000/actors/$aid/messages?PYTHONUNBUFFERED=0 -H "Content-type: application/json" -d '{"length": 500000}' +``` + + diff --git a/samples/logger/logger.py b/samples/logger/logger.py new file mode 100644 index 00000000..9e1613da --- /dev/null +++ b/samples/logger/logger.py @@ -0,0 +1,21 @@ +# This function can be used to test the limits of the Abaco logging facility. It will +# 1. With a JSON object containing {"text": "the text to count", "reducer": "the id of the reducer actor"} +# 2. With a string of text to count (in this case, skips calling the reducer). + +import json +from agavepy.actors import get_context, get_client, send_bytes_result + +def main(): + context = get_context() + try: + length = context['message_dict']['length'] + print("Got JSON: {}".format(context['message_dict'])) + except Exception: + print("Didnt get JSON, using defaults.") + length = 100000 + log = 'a'*length + print(log) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/tests/rabbit_abaco_tests.py b/tests/rabbit_abaco_tests.py index a4946ff6..c8b04556 100644 --- a/tests/rabbit_abaco_tests.py +++ b/tests/rabbit_abaco_tests.py @@ -59,10 +59,9 @@ def minimal_test(): # send spawner a message to start a worker for a new actor worker_id = models.Worker.ensure_one_worker(aid, actor.tenant) - worker_ids = [worker_id] ch = channels.CommandChannel() ch.put_cmd(actor_id=aid, - worker_ids=worker_ids, + worker_id=worker_id, image=actor.image, tenant=actor.tenant, stop_existing=False) diff --git a/tests/test_abaco_core.py b/tests/test_abaco_core.py index 0ae9288c..e542ae43 100644 --- a/tests/test_abaco_core.py +++ b/tests/test_abaco_core.py @@ -1,6 +1,6 @@ # Functional test suite for abaco. # This test suite now runs in its own docker container. To build the image, run -# docker build -f Dockerfile-test -t jstubbs/abaco_testsuite . +# docker build -f Dockerfile-test -t abaco/testsuite . # from within the tests directory. # # To run the tests execute, first start the development stack using: @@ -43,7 +43,7 @@ import json import pytest -from actors import health, models, codes, stores, spawner +from actors import controllers, health, models, codes, stores, spawner from channels import ActorMsgChannel, CommandChannel from util import headers, base_url, case, \ response_format, basic_response_checks, get_actor_id, check_execution_details, \ @@ -622,14 +622,32 @@ def test_execute_and_delete_sleep_loop_actor(headers): else: assert result.get('executionId') exc_id = result.get('executionId') + # wait for a worker to take the execution - + url = '{}/actors/{}/executions/{}'.format(base_url, actor_id, exc_id) + worker_id = None + idx = 0 + while not worker_id: + rsp = requests.get(url, headers=headers).json().get('result') + if case == 'snake': + worker_id = rsp.get('worker_id') + else: + worker_id = rsp.get('workerId') + idx += 1 + time.sleep(1) + if idx > 10: + print("worker never got sleep_loop execution. " + "actor: {}; execution: {}; idx:{}".format(actor_id, exc_id, idx)) + assert False # now let's kill the execution - time.sleep(1) url = '{}/actors/{}/executions/{}'.format(base_url, actor_id, exc_id) rsp = requests.delete(url, headers=headers) assert rsp.status_code in [200, 201, 202, 203, 204] + assert 'Issued force quit command for execution' in rsp.json().get('message') # make sure execution is stopped in a timely manner i = 0 stopped = False + status = None while i < 20: rsp = requests.get(url, headers=headers) rsp_data = json.loads(rsp.content.decode('utf-8')) @@ -639,6 +657,8 @@ def test_execute_and_delete_sleep_loop_actor(headers): break time.sleep(1) i += 1 + print("Execution never stopped. Last status of execution: {}; " + "actor_id: {}; execution_id: {}".format(status, actor_id, exc_id)) assert stopped def test_list_execution_details(headers): @@ -786,7 +806,7 @@ def test_create_actor_with_custom_queue_name(headers): @pytest.mark.queuetest def test_actor_uses_custom_queue(headers): url = '{}/actors'.format(base_url) - # get the actor id of an actor registered on the defaul queue: + # get the actor id of an actor registered on the default queue: default_queue_actor_id = get_actor_id(headers, name='abaco_test_suite_statelesss') # and the actor id for the actor on the special queue: special_queue_actor_id = get_actor_id(headers, name='abaco_test_suite_queue1_actor1') @@ -1020,6 +1040,16 @@ def test_other_user_still_cant_list_actor(headers): data = response_format(rsp) assert 'you do not have access to this actor' in data['message'] +@pytest.mark.aliastest +def test_other_user_still_cant_create_alias_nonce(headers): + # alias permissions do not confer access to the actor itself, and alias nonces require BOTH + # permissions on the alias AND on the actor + url = '{}/actors/aliases/{}/nonces'.format(base_url, ALIAS_1) + rsp = requests.get(url, headers=priv_headers()) + assert rsp.status_code == 400 + data = response_format(rsp) + assert 'you do not have access to this alias and actor' in data['message'] + @pytest.mark.aliastest def test_get_actor_with_alias(headers): @@ -1620,6 +1650,254 @@ def test_tenant_list_workers(headers): check_worker_fields(worker) +############## +# events tests +############## + +REQUEST_BIN_URL = 'https://enqjwyug892gl.x.pipedream.net' + + +def check_event_logs(logs): + assert 'event_time_utc' in logs + assert 'event_time_display' in logs + assert 'actor_id' in logs + +def test_has_cycles_1(): + links = {'A': 'B', + 'B': 'C', + 'C': 'D'} + assert not controllers.has_cycles(links) + +def test_has_cycles_2(): + links = {'A': 'B', + 'B': 'A', + 'C': 'D'} + assert controllers.has_cycles(links) + +def test_has_cycles_3(): + links = {'A': 'B', + 'B': 'C', + 'C': 'D', + 'D': 'E', + 'E': 'H', + 'H': 'B'} + assert controllers.has_cycles(links) + +def test_has_cycles_4(): + links = {'A': 'B', + 'B': 'C', + 'D': 'E', + 'E': 'H', + 'H': 'J', + 'I': 'J', + 'K': 'J', + 'L': 'M', + 'M': 'J'} + assert not controllers.has_cycles(links) + +def test_has_cycles_5(): + links = {'A': 'B', + 'B': 'C', + 'C': 'C'} + assert controllers.has_cycles(links) + +def test_create_event_link_actor(headers): + url = '{}/{}'.format(base_url, '/actors') + data = {'image': 'jstubbs/abaco_test', 'name': 'abaco_test_suite_event-link', 'stateless': False} + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + +def test_create_actor_with_link(headers): + # first, get the actor id of the event_link actor: + link_actor_id = get_actor_id(headers, name='abaco_test_suite_event-link') + # register a new actor with link to event_link actor + url = '{}/{}'.format(base_url, '/actors') + data = {'image': 'jstubbs/abaco_test', + 'name': 'abaco_test_suite_event', + 'link': link_actor_id} + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + +def test_create_actor_with_webhook(headers): + # register an actor to serve as the webhook target + url = '{}/{}'.format(base_url, '/actors') + data = {'image': 'jstubbs/abaco_test', 'name': 'abaco_test_suite_event-webhook', 'stateless': False} + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + aid = get_actor_id(headers, name='abaco_test_suite_event-webhook') + # create a nonce for this actor + url = '{}/actors/{}/nonces'.format(base_url, aid) + rsp = requests.post(url, headers=headers) + result = basic_response_checks(rsp) + nonce = result['id'] + # in practice, no one should ever do this - the built in link property should be used instead; + # but this illustrates the use of the webhook feature without relying on external APIs. + webhook = '{}/actors/{}/messages?x-nonce={}'.format(base_url, aid, nonce) + # make a new actor with a webhook property that points to the above messages endpoint - + url = '{}/actors'.format(base_url) + data = {'image': 'jstubbs/abaco_test', + 'name': 'abaco_test_suite_event-2', + 'webhook': webhook} + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + event_aid = result['id'] + # once the new actor is READY, the webhook actor should have gotten a message to + url = '{}/actors/{}/executions'.format(base_url, aid) + webhook_ready_ex_id = None + idx = 0 + while not webhook_ready_ex_id and idx < 25: + rsp = requests.get(url, headers=headers) + ex_data = rsp.json().get('result').get('executions') + if ex_data and len(ex_data) > 0: + webhook_ready_ex_id = ex_data[0]['id'] + break + else: + idx = idx + 1 + time.sleep(1) + if not webhook_ready_ex_id: + print("webhook actor never executed. actor_id: {}; webhook_actor_id: {}".format(event_aid, aid)) + assert False + # wait for linked execution to complete and get logs + idx = 0 + done = False + while not done and idx < 20: + # get executions for linked actor and check status of each + rsp = requests.get(url, headers=headers) + ex_data = rsp.json().get('result').get('executions') + if ex_data[0].get('status') == 'COMPLETE': + done = True + break + else: + time.sleep(1) + idx = idx + 1 + if not done: + print("webhook actor executions never completed. actor: {}; " + "actor: {}; Final execution data: {}".format(event_aid, aid, ex_data)) + assert False + # now check the logs from the two executions -- + # first one should be the actor READY message: + url = '{}/actors/{}/executions/{}/logs'.format(base_url, aid, webhook_ready_ex_id) + rsp = requests.get(url, headers=headers) + result = basic_response_checks(rsp) + logs = result.get('logs') + assert "'event_type': 'ACTOR_READY'" in logs + check_event_logs(logs) + + +def test_execute_event_actor(headers): + actor_id = get_actor_id(headers, name='abaco_test_suite_event') + data = {'message': 'testing events execution'} + result = execute_actor(headers, actor_id, data=data) + exec_id = result['id'] + # now that this execution has completed, check that the linked actor also executed: + idx = 0 + link_execution_ex_id = None + link_actor_id = get_actor_id(headers, name='abaco_test_suite_event-link') + url = '{}/actors/{}/executions'.format(base_url, link_actor_id) + # the linked actor should get 2 messages - one for the original actor initially being set to READY + # and a second when the execution sent above completes. + while not link_execution_ex_id and idx < 15: + rsp = requests.get(url, headers=headers) + ex_data = rsp.json().get('result').get('executions') + if ex_data and len(ex_data) > 1: + link_ready_ex_id = ex_data[0]['id'] + link_execution_ex_id = ex_data[1]['id'] + break + else: + idx = idx + 1 + time.sleep(1) + if not link_execution_ex_id: + print("linked actor never executed. actor_id: {}; link_actor_id: {}".format(actor_id, link_actor_id)) + assert False + # wait for linked execution to complete and get logs + idx = 0 + done = False + while not done and idx < 20: + # get executions for linked actor and check status of each + rsp = requests.get(url, headers=headers) + ex_data = rsp.json().get('result').get('executions') + if ex_data[0].get('status') == 'COMPLETE' and ex_data[1].get('status') == 'COMPLETE': + done = True + break + else: + time.sleep(1) + idx = idx + 1 + if not done: + print("linked actor executions never completed. actor: {}; " + "linked_actor: {}; Final execution data: {}".format(actor_id, link_actor_id, ex_data)) + assert False + # now check the logs from the two executions -- + # first one should be the actor READY message: + url = '{}/actors/{}/executions/{}/logs'.format(base_url, link_actor_id, link_ready_ex_id) + rsp = requests.get(url, headers=headers) + result = basic_response_checks(rsp) + logs = result.get('logs') + assert "'event_type': 'ACTOR_READY'" in logs + check_event_logs(logs) + + # second one should be the actor execution COMPLETE message: + url = '{}/actors/{}/executions/{}/logs'.format(base_url, link_actor_id, link_execution_ex_id) + rsp = requests.get(url, headers=headers) + result = basic_response_checks(rsp) + logs = result.get('logs') + assert "'event_type': 'EXECUTION_COMPLETE'" in logs + assert 'execution_id' in logs + check_event_logs(logs) + +def test_cant_create_link_with_cycle(headers): + # this test checks that adding a link to an actor that did not have one that creates a cycle + # is not allowed. + # register a new actor with no link + url = '{}/{}'.format(base_url, '/actors') + data = {'image': 'jstubbs/abaco_test', + 'name': 'abaco_test_suite_create_link',} + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + new_actor_id = result['id'] + # create 5 new actors, each with a link to the one created previously: + new_actor_ids = [] + for i in range(5): + data['link'] = new_actor_id + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + new_actor_id = result['id'] + new_actor_ids.append(new_actor_id) + # now, update the first created actor with a link that would create a cycle + first_aid = new_actor_ids[0] + data['link'] = new_actor_ids[4] + url = '{}/actors/{}'.format(base_url, first_aid) + print("url: {}; data: {}".format(url, data)) + rsp = requests.put(url, data=data, headers=headers) + assert rsp.status_code == 400 + assert 'this update would result in a cycle of linked actors' in rsp.json().get('message') + +def test_cant_update_link_with_cycle(headers): + # this test checks that an update to a link that would create a cycle is not allowed + link_actor_id = get_actor_id(headers, name='abaco_test_suite_event-link') + # register a new actor with link to event_link actor + url = '{}/{}'.format(base_url, '/actors') + data = {'image': 'jstubbs/abaco_test', + 'name': 'abaco_test_suite_event', + 'link': link_actor_id} + # create 5 new actors, each with a link to the one created previously: + new_actor_ids = [] + for i in range(5): + rsp = requests.post(url, data=data, headers=headers) + result = basic_response_checks(rsp) + new_actor_id = result['id'] + data['link'] = new_actor_id + new_actor_ids.append(new_actor_id) + # now, update the first created actor with a link that would great a cycle + first_aid = new_actor_ids[0] + data['link'] = new_actor_ids[4] + url = '{}/actors/{}'.format(base_url, first_aid) + print("url: {}; data: {}".format(url, data)) + + rsp = requests.put(url, data=data, headers=headers) + assert rsp.status_code == 400 + assert 'this update would result in a cycle of linked actors' in rsp.json().get('message') + + ############## # Clean up ##############