-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool_logging.py
84 lines (71 loc) · 2.95 KB
/
pool_logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
log_file = 'PATH_TO_LOG_FILE/multi_log_test.log'
import logging
import logging.handlers
import numpy as np
import time
import multiprocessing
# Adapted from
# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
# with the help of
# https://stackoverflow.com/questions/48045978/how-to-log-to-single-file-with-multiprocessing-pool-apply-async
def listener_configurer():
root = logging.getLogger()
h = logging.FileHandler(log_file)
f = logging.Formatter('%(asctime)s %(message)s')
h.setFormatter(f)
root.addHandler(h)
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def task_worker_configurer(queue):
root = logging.getLogger()
#if not len(root.handlers):
h = logging.handlers.QueueHandler(queue)
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
def task_function(sleep_time, task_name, queue, configurer):
name = multiprocessing.current_process().name
configurer(queue)
start_message = 'Worker {} has started task {} and will now sleep for {}s'.format(name, task_name, sleep_time)
logging.info(start_message)
time.sleep(sleep_time)
success_message = 'Worker {} has finished task {} of sleeping for {}s'.format(name, task_name, sleep_time)
logging.info(success_message)
def main():
start_time = time.time()
queue = multiprocessing.Manager().Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
pool = multiprocessing.Pool(processes=6, maxtasksperchild=1)
# I need maxtasksperchild=1 to destroy / clean up each worker after its task completion.
# I do this because without it, each time a given worker picked up a new task, the number
# of duplicate messages for each logging event incremented by one.
job_list = [np.random.randint(8,10) for i in range(6)]
single_thread_time = np.sum(job_list)
for i, sleep_time in enumerate(job_list):
name = str(i)
pool.apply_async(task_function,
args=(sleep_time, name, queue, task_worker_configurer))
pool.close()
pool.join()
queue.put_nowait(None)
end_time = time.time()
print("sum of task lengths is {}s, but script execution time was {}s".format(
single_thread_time,
(end_time - start_time)
))
listener.join()
if __name__ == "__main__":
main()