-
Notifications
You must be signed in to change notification settings - Fork 0
/
backup.py
97 lines (83 loc) · 3.36 KB
/
backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import subprocess, os
import boto3
from datetime import datetime
from botocore.exceptions import ClientError
from configparser import ConfigParser, NoOptionError, NoSectionError
from boto3_type_annotations.s3 import Client
import newrelic.agent
application = newrelic.agent.register_application(timeout=10.0)
class ConfigError(Exception):
def __init__(self, message):
self.message = message
class BackupCreationError(Exception):
def __init__(self, message):
self.message = message
def current_path() -> str:
return os.path.dirname(os.path.realpath(__file__))
class Config:
def __init__(self, filename: str = "secrets.conf"):
conf = ConfigParser()
conf.read(os.path.join(current_path(), filename))
self.conf = conf
def get_conf(self) -> ConfigParser:
return self.conf
def get(self, section: str, option: str):
if section is None or option is None:
raise ConfigError("Please provide both SECTION and OPTION")
try:
return self.conf.get(section, option)
except NoOptionError as e:
raise ConfigError(f"There is no option {option} in section {section}")
except NoSectionError as e:
raise ConfigError(
f"There is no section {section} in the configuration provided"
)
except Exception as e:
raise ConfigError(f"An unknown error occured. {e}")
@newrelic.agent.function_trace(name="create_backup", group="function")
def create_backup(config: Config) -> str:
today = datetime.now()
timestamp = today.strftime("%Y-%m-%d")
filename_base = config.get("backup_data", "filename")
filename = f"{filename_base}-{timestamp}.tar.gz"
# script = f"tar czf {filename} env"
volume_name = config.get("backup_data", "volume_name")
path_name = current_path()
script = f"docker run --rm --volume {volume_name}:/dbdata --volume {path_name}:/backup ubuntu tar czf /backup/{filename} /dbdata"
process = subprocess.run(
script.split(), stderr=subprocess.PIPE, stdout=subprocess.PIPE
)
if process.returncode != 0:
raise BackupCreationError(
f"Backup could not be created.\n{process.stderr.decode('utf-8')}"
)
return filename
@newrelic.agent.function_trace(name="s3_upload", group="function")
def upload_file(file_name: str, bucket_name: str, s3_client: Client) -> bool:
today = datetime.now()
try:
response = s3_client.upload_file(os.path.join(current_path(), file_name), bucket_name, file_name)
except ClientError as e:
print(e)
return False
return True
def get_s3_client(conf: Config) -> Client:
return boto3.client(
"s3",
aws_access_key_id=conf.get("aws_s3_credentials", "access_key_id"),
aws_secret_access_key=conf.get("aws_s3_credentials", "secret_key"),
)
def run():
with newrelic.agent.BackgroundTask(application, name="gpnn-ai-mail-backup", group="Task"):
conf = Config()
backup_filename = create_backup(conf)
upload_success = upload_file(
file_name=backup_filename,
bucket_name=conf.get("aws_s3_bucket", "name"),
s3_client=get_s3_client(conf),
)
if upload_success:
subprocess.run(f"rm {os.path.join(current_path(), backup_filename)}".split())
if __name__ == "__main__":
run()
print("Done!")