-
Notifications
You must be signed in to change notification settings - Fork 4
/
test-beat-infrastructure.py
44 lines (31 loc) · 1.1 KB
/
test-beat-infrastructure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import os
import re
from urlparse import urlparse
def get_env(src):
ret = {}
for l in src.splitlines():
mat = re.match('export\s+([^=]+)="?([^"]+)"?', l)
if mat:
groups = mat.groups()
ret[groups[0]] = groups[1]
return ret
def test_db_connection(host):
with host.sudo():
env = get_env(host.run('cat /webapps/firecares/bin/postactivate').stdout)
db = env['DATABASE_HOST']
host.run_expect([0], 'nc -w 2 -z {} 5432'.format(db))
def test_rabbit_connection(host):
with host.sudo():
env = get_env(host.run('cat /webapps/firecares/bin/postactivate').stdout)
rabbit = urlparse(env['BROKER_URL']).hostname
host.run_expect([0], 'nc -w 2 -z {} 5672'.format(rabbit))
def test_nginx_not_running_and_disabled(host):
nginx = host.service("nginx")
assert not nginx.is_running
assert not nginx.is_enabled
def test_celery_running(host):
with host.sudo():
assert host.supervisor('celery').is_running
def test_gunicorn_not_running(host):
with host.sudo():
assert host.process.filter(comm='gunicorn') == []