-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmanage.py
executable file
·149 lines (124 loc) · 4.45 KB
/
manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
from datetime import datetime
from docopt import docopt
from sqlalchemy import text
from srht.app import app, db
from srht.config import _cfg
from srht.objects import Job, Upload, User
from srht.tasks import GenerateImageThumbnail, Task
def do_task(arguments):
count = int(arguments["<count>"])
start = 0
while count > start:
task = Task.get_next_task()
if task:
task.run()
start += 1
def queue_task_for_missing_thumbnails(arguments):
uploads = Upload.query.filter(Upload.thumbnail == None).all()
for upload in uploads:
task = GenerateImageThumbnail(upload.id)
task.queue()
def apply_migrations(arguments):
if _cfg("migrations"):
folder_path = _cfg("migrations")
if folder_path:
try:
# Loop through all files in the folder
for filename in os.listdir(folder_path):
if filename.endswith(".sql"):
file_path = os.path.join(folder_path, filename)
# Read the SQL script
with open(file_path, "r") as file:
sql_script = file.read()
# Execute the SQL script
db.session.execute(text(sql_script))
db.session.commit()
print(f"Executed {filename}")
except Exception as e:
db.session.rollback()
print(f"An error occurred: {e}")
def remove_admin(arguments):
u = User.query.filter(User.username == arguments["<name>"]).first()
if u:
u.admin = False # remove admin
db.session.commit()
else:
print("Not a valid user")
def make_admin(arguments):
u = User.query.filter(User.username == arguments["<name>"]).first()
if u:
u.admin = True # make admin
db.session.commit()
else:
print("Not a valid user")
def list_admin(arguments):
users = User.query.filter(User.admin)
for u in users:
print(u.username)
def approve_user(arguments):
u = User.query.filter(User.username == arguments["<name>"]).first()
if u:
u.approved = True # approve user
u.approvalDate = datetime.now()
db.session.commit()
else:
print("Not a valid user")
def create_user(arguments):
u = User(arguments["<name>"], arguments["<email>"], arguments["<password>"])
if u:
u.approved = True # approve user
u.approvalDate = datetime.now()
db.session.add(u)
db.session.commit()
print("User created")
else:
print("Couldn't create the uer")
def reset_password(arguments):
u = User.query.filter(User.username == arguments["<name>"]).first()
if u:
password = arguments["<password>"]
if len(password) < 5 or len(password) > 256:
print("Password must be between 5 and 256 characters.")
return
u.set_password(password)
db.session.commit()
else:
print("Not a valid user")
interface = """
Command line admin interface
Usage:
manage.py admin promote <name>
manage.py admin demote <name>
manage.py admin list
manage.py user approve <name>
manage.py user create <name> <password> <email>
manage.py user reset_password <name> <password>
manage.py database migrate
manage.py task run <count>
manage.py thumbnails queue
manage.py thumbnails recreate <url> #TODO
Options:
-h --help Show this screen.
"""
if __name__ == "__main__":
with app.app_context():
arguments = docopt(interface, version="1")
if arguments["admin"] and arguments["promote"]:
make_admin(arguments)
elif arguments["admin"] and arguments["demote"]:
remove_admin(arguments)
elif arguments["admin"] and arguments["list"]:
list_admin(arguments)
elif arguments["user"] and arguments["approve"]:
approve_user(arguments)
elif arguments["user"] and arguments["create"]:
create_user(arguments)
elif arguments["user"] and arguments["reset_password"]:
reset_password(arguments)
elif arguments["database"] and arguments["migrate"]:
apply_migrations(arguments)
elif arguments["task"] and arguments["run"]:
do_task(arguments)
elif arguments["thumbnails"] and arguments["queue"]:
queue_task_for_missing_thumbnails(arguments)