-
Notifications
You must be signed in to change notification settings - Fork 30
/
TaskQueueRunner.cpp
89 lines (69 loc) · 1.43 KB
/
TaskQueueRunner.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include "TaskQueueRunner.h"
using namespace std;
using namespace boost;
TaskQueueRunner::TaskQueueRunner(int capacity, int batch)
: batch(batch), running(batch), pending(capacity)
{
}
void TaskQueueRunner::Enqueue(void* task)
{
pending.push(task);
}
void TaskQueueRunner::Run()
{
// add the requested number of tasks from `pending` to `running`
for (auto i = 0; i < batch; i++)
{
void* task;
if (pending.pop(task))
{
running.push(task);
}
else
{
break;
}
}
// loop until both queues are empty
while (!pending.empty() || !running.empty())
{
void* task;
if (!running.pop(task))
{
break;
}
// cast the task back to its original type and evaluate it
auto func = PTR_TO_MFN(task);
auto eval = (*func)();
// since these pointers were created with `new std::function()`
// we are the ones responsible to delete it after use
delete func;
// if the function evaluation returned a new function pointer,
// put that one back into the queue. otherwise pop a new one
// from the `pending` queue.
if (eval != nullptr)
{
running.push(eval);
}
else
{
void* next;
if (pending.pop(next))
{
running.push(next);
}
}
}
}
void TaskQueueRunner::QuickScan(ServiceScanner& scanner, Services& services)
{
TaskQueueRunner tqr(services.size(), 65535);
for (auto service : services)
{
tqr.Enqueue(scanner.GetTask(service));
}
tqr.Run();
}
TaskQueueRunner::~TaskQueueRunner()
{
}