Skip to content

Commit

Permalink
Merge commit 'f0617f2c10a49262cd3b1cfacb5b36f327310735'
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Stubbs committed Jul 3, 2019
2 parents bcb1648 + f0617f2 commit 424834d
Show file tree
Hide file tree
Showing 28 changed files with 1,185 additions and 299 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ local-dev.conf.bak

passwords.yml
*.key
*.log
*.log
abaco.log
9 changes: 5 additions & 4 deletions Dockerfile-test
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Test suite for abaco project.
# Image: jstubbs/abaco_testsuite
# Image: abaco/testsuite

from alpine:3.8

RUN apk add --update musl python3 && rm /var/cache/apk/*
RUN apk add --update musl python3 && rm -f /var/cache/apk/*
RUN apk add --update bash && rm -f /var/cache/apk/*
RUN apk add --update git && rm -f /var/cache/apk/*
RUN apk add --update g++ -f /var/cache/apk/*
RUN apk add --update python3-dev -f /var/cache/apk/*
RUN apk add --update g++ && rm -f /var/cache/apk/*
RUN apk add --update python3-dev && rm -f /var/cache/apk/*
RUN apk add --update linux-headers && rm -f /var/cache/apk/*
ADD actors/requirements.txt /requirements.txt
RUN pip3 install -r /requirements.txt
RUN pip3 install pytest ipython locustio
Expand Down
5 changes: 5 additions & 0 deletions actors/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ def check_privileged():
if data.get('mem_limit') or data.get('memLimit'):
logger.debug("User is trying to set mem limit")
raise PermissionsException("Not authorized -- only admins and privileged users can set mem limit.")
if data.get('queue'):
logger.debug("User is trying to set queue")
raise PermissionsException("Not authorized -- only admins and privileged users can set queue.")


else:
logger.debug("user allowed to set privileged.")

Expand Down
10 changes: 8 additions & 2 deletions actors/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,15 @@ def request_delete_client(self, tenant, actor_id, worker_id, client_id, secret):
class CommandChannel(Channel):
"""Work with commands on the command channel."""

def __init__(self):
def __init__(self, name='default'):
self.uri = Config.get('rabbit', 'uri')
super().__init__(name='command',
queues_list = Config.get('spawner', 'host_queues').replace(' ', '')
valid_queues = queues_list.split(',')
if name not in valid_queues:
raise Exception('Invalid Queue name.')


super().__init__(name='command_channel_{}'.format(name),
connection_type=RabbitConnection,
uri=self.uri)

Expand Down
11 changes: 10 additions & 1 deletion actors/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ class ClientGenerator(object):

def __init__(self):
self.secret = os.environ.get('_abaco_secret')
self.ch = ClientsChannel()
ready = False
i = 0
while not ready:
try:
self.ch = ClientsChannel()
ready = True
except RuntimeError as e:
i = i + 1
if i > 10:
raise e
self.credentials = {}
for tenant in get_tenants():
self.credentials[tenant] = {'username': os.environ.get('_abaco_{}_username'.format(tenant), ''),
Expand Down
39 changes: 23 additions & 16 deletions actors/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
message_gauges = {}
rate_gauges = {}
last_metric = {}
command_gauge = Gauge('message_count_for_command_channel',
'Number of messages currently in the Command Channel')

clients_gauge = Gauge('clients_count_for_clients_store',
'Number of clients currently in the clients_store')



try:
ACTOR_MAX_WORKERS = Config.get("spawner", "max_workers_per_actor")
except:
Expand All @@ -50,6 +50,7 @@
except:
num_init_workers = 1


class MetricsResource(Resource):
def get(self):
actor_ids = self.get_metrics()
Expand All @@ -66,12 +67,6 @@ def get_metrics(self):
in actors_store.items() if actor.get('stateless') and not actor.get('status') == 'ERROR'
]

ch = CommandChannel()
command_gauge.set(len(ch._queue._queue))
logger.debug("METRICS COMMAND CHANNEL size: {}".format(command_gauge._value._value))
ch.close()
logger.debug("ACTOR IDS: {}".format(actor_ids))

try:
if actor_ids:
# Create a gauge for each actor id
Expand Down Expand Up @@ -102,7 +97,7 @@ def check_metrics(self, actor_ids):
last_metric.update({actor_id: data})

workers = Worker.get_workers(actor_id)
actor = actor = actors_store[actor_id]
actor = actors_store[actor_id]
logger.debug('METRICS: MAX WORKERS TEST {}'.format(actor))

# If this actor has a custom max_workers, use that. Otherwise use default.
Expand All @@ -126,7 +121,7 @@ def check_metrics(self, actor_ids):
# Add a worker if message count reaches a given number
try:
logger.debug("METRICS current message count: {}".format(current_message_count))
if metrics_utils.allow_autoscaling(command_gauge._value._value, max_workers, len(workers)):
if metrics_utils.allow_autoscaling(max_workers, len(workers)):
if current_message_count >= 1:
metrics_utils.scale_up(actor_id)
logger.debug("METRICS current message count: {}".format(data[0]['value'][1]))
Expand Down Expand Up @@ -386,6 +381,12 @@ def validate_post(self):
parser = Actor.request_parser()
try:
args = parser.parse_args()
if args['queue']:
queues_list = Config.get('spawner', 'host_queues').replace(' ', '')
valid_queues = queues_list.split(',')
if args['queue'] not in valid_queues:
raise BadRequest('Invalid queue name.')

except BadRequest as e:
msg = 'Unable to process the JSON description.'
if hasattr(e, 'data'):
Expand All @@ -398,6 +399,7 @@ def validate_post(self):
def post(self):
logger.info("top of POST to register a new actor.")
args = self.validate_post()

logger.debug("validate_post() successful")
args['tenant'] = g.tenant
args['api_server'] = g.api_server
Expand Down Expand Up @@ -496,6 +498,11 @@ def put(self, actor_id):
args = self.validate_put(actor)
logger.debug("PUT args validated successfully.")
args['tenant'] = g.tenant
if args['queue']:
queues_list = Config.get('spawner', 'host_queues').replace(' ', '')
valid_queues = queues_list.split(',')
if args['queue'] not in valid_queues:
raise BadRequest('Invalid queue name.')
# user can force an update by setting the force param:
update_image = args.get('force')
if not update_image and args['image'] == previous_image:
Expand Down Expand Up @@ -531,7 +538,8 @@ def put(self, actor_id):
logger.info("updated actor {} stored in db.".format(actor_id))
if update_image:
worker_ids = [Worker.request_worker(tenant=g.tenant, actor_id=actor.db_id)]
ch = CommandChannel()
# get actor queue name
ch = CommandChannel(name=actor.queue)
ch.put_cmd(actor_id=actor.db_id, worker_ids=worker_ids, image=actor.image, tenant=args['tenant'])
ch.close()
logger.debug("put new command on command channel to update actor.")
Expand Down Expand Up @@ -1021,9 +1029,9 @@ def validate_post(self):
def post(self, actor_id):
"""Ensure a certain number of workers are running for an actor"""
logger.debug("top of POST /actors/{}/workers.".format(actor_id))
id = g.db_id
dbid = g.db_id
try:
actor = Actor.from_db(actors_store[id])
actor = Actor.from_db(actors_store[dbid])
except KeyError:
logger.debug("did not find actor: {}.".format(actor_id))
raise ResourceError("No actor found with id: {}.".format(actor_id), 404)
Expand All @@ -1033,8 +1041,7 @@ def post(self, actor_id):
if not num or num == 0:
logger.debug("did not get a num: {}.".format(actor_id))
num = 1
logger.debug("ensuring at least {} workers. actor: {}.".format(num, actor_id))
dbid = Actor.get_dbid(g.tenant, actor_id)
logger.debug("ensuring at least {} workers. actor: {}.".format(num, dbid))
try:
workers = Worker.get_workers(dbid)
except WorkerException as e:
Expand All @@ -1052,7 +1059,7 @@ def post(self, actor_id):
worker_ids = [Worker.request_worker(tenant=g.tenant,
actor_id=dbid)]
logger.info("New worker id: {}".format(worker_ids[0]))
ch = CommandChannel()
ch = CommandChannel(name=actor.queue)
ch.put_cmd(actor_id=actor.db_id,
worker_ids=worker_ids,
image=actor.image,
Expand Down
24 changes: 16 additions & 8 deletions actors/docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,20 @@ def list_all_containers():
cli = docker.APIClient(base_url=dd, version="auto")
# todo -- finish

def container_running(image=None, name=None):
"""Check if there is a running container for an image.
image should be fully qualified; e.g. image='jstubbs/abaco_core'
Can pass wildcards in name using * character; e.g. name='abaco_spawner*'
def container_running(name=None):
"""Check if there is a running container whose name contains the string, `name`. Note that this function will
return True if any running container has a name which contains the input `name`.
"""
logger.debug("top of container_running().")
filters = {}
if name:
filters['name'] = name
if image:
filters['image'] = image
cli = docker.APIClient(base_url=dd, version="auto")
try:
containers = cli.containers(filters=filters)
except Exception as e:
msg = "There was an error checking container_running for image: {}. Exception: {}".format(image, e)
msg = "There was an error checking container_running for name: {}. Exception: {}".format(name, e)
logger.error(msg)
raise DockerError(msg)
logger.debug("found containers: {}".format(containers))
Expand Down Expand Up @@ -175,14 +173,24 @@ def run_container_with_docker(image,
host_config = cli.create_host_config(binds=binds, auto_remove=auto_remove)
logger.debug("binds: {}".format(binds))

# add the container to a specific docker network, if configured
netconf = None
try:
docker_network = Config.get('spawner', 'docker_network')
except Exception:
docker_network = None
if docker_network:
netconf = cli.create_networking_config({docker_network: cli.create_endpoint_config()})

# create and start the container
try:
container = cli.create_container(image=image,
environment=environment,
volumes=volumes,
host_config=host_config,
command=command,
name=name)
name=name,
networking_config=netconf)
cli.start(container=container.get('Id'))
except Exception as e:
msg = "Got exception trying to run container from image: {}. Exception: {}".format(image, e)
Expand Down
79 changes: 65 additions & 14 deletions actors/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def check_workers(actor_id, ttl):
ch.close()
except Exception as e:
logger.error("Got an error trying to close the worker channel for dead worker. Exception: {}".format(e))
if not result == 'ok':
if result and not result == 'ok':
logger.error("Worker responded unexpectedly: {}, deleting worker.".format(result))
try:
rm_container(worker['cid'])
Expand Down Expand Up @@ -276,6 +276,69 @@ def check_workers(actor_id, ttl):
# else:
# logger.debug("Worker not in READY status, will postpone.")

def get_host_queues():
"""
Read host_queues string from config and parse to return a Python list.
:return: list[str]
"""
try:
host_queues_str = Config.get('spawner', 'host_queues')
return [ s.strip() for s in host_queues_str.split(',')]
except Exception as e:
msg = "Got unexpected exception attempting to parse the host_queues config. Exception: {}".format(e)
logger.error(e)
raise e

def start_spawner(queue, idx='0'):
"""
Start a spawner on this host listening to a queue, `queue`.
:param queue: (str) - the queue the spawner should listen to.
:param idx: (str) - the index to use as a suffix to the spawner container name.
:return:
"""
command = 'python3 -u /actors/spawner.py'
name = 'healthg_{}_spawner_{}'.format(queue, idx)
environment = {'AE_IMAGE': AE_IMAGE.split(':')[0],
'queue': queue
}
# check logging strategy to determine log file name:
try:
run_container_with_docker(AE_IMAGE,
command,
name=name,
environment=environment,
mounts=[],
log_file=None)
except Exception as e:
logger.critical("Could not restart spawner for queue {}. Exception: {}".format(queue, e))

def check_spawner(queue):
"""
Check the health and existence of a spawner on this host for a particular queue.
:param queue: (str) - the queue to check on.
:return:
"""
logger.debug("top of check_spawner for queue: {}".format(queue))
# spawner container names by convention should have the format <project>_<queue>_spawner_<count>; for example
# abaco_default_spawner_2.
# so, we look for container names containing a string with that format:
spawner_name_segment = '{}_spawner'.format(queue)
if not container_running(name=spawner_name_segment):
logger.critical("No spawners running for queue {}! Launching new spawner..".format(queue))
start_spawner(queue)
else:
logger.debug("spawner for queue {} already running.".format(queue))

def check_spawners():
"""
Check health of spawners running on a given host.
:return:
"""
logger.debug("top of check_spawners")
host_queues = get_host_queues()
logger.debug("checking spawners for queues: {}".format(host_queues))
for queue in host_queues:
check_spawner(queue)

def manage_workers(actor_id):
"""Scale workers for an actor if based on message queue size and policy."""
Expand All @@ -300,6 +363,7 @@ def shutdown_all_workers():

def main():
logger.info("Running abaco health checks. Now: {}".format(time.time()))
check_spawners()
try:
clean_up_ipc_dirs()
except Exception as e:
Expand All @@ -308,19 +372,6 @@ def main():
ttl = Config.get('workers', 'worker_ttl')
except Exception as e:
logger.error("Could not get worker_ttl config. Exception: {}".format(e))
if not container_running(name='spawner*'):
logger.critical("No spawners running! Launching new spawner..")
command = 'python3 -u /actors/spawner.py'
# check logging strategy to determine log file name:
try:
run_container_with_docker(AE_IMAGE,
command,
name='abaco_spawner_0',
environment={'AE_IMAGE': AE_IMAGE.split(':')[0]},
mounts=[],
log_file=None)
except Exception as e:
logger.critical("Could not restart spawner. Exception: {}".format(e))
try:
ttl = int(ttl)
except Exception as e:
Expand Down
Loading

0 comments on commit 424834d

Please sign in to comment.