forked from guahan-web/PHP-SimpleQueue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSimpleQueue.php
155 lines (136 loc) · 5.79 KB
/
SimpleQueue.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
<?php
/**
* @fileoverview
* This file defines the logic of the SimpleQueue object
*
* @author Garth Henson (http://www.guahanweb.com)
* @since 1.0
* @version 1.0
*/
require('SimpleQueueDB.php');
require('QueueLogger.php');
/**
* SimpleQueue
*
* @author Garth Henson (http://www.guahawneb.com)
*/
class SimpleQueue {
protected $limit = 0;
protected $retries = 3;
protected $delay = 15;
protected $table = 'simple_queue';
protected $callbacks = array();
protected $logger;
protected $dequeue_time;
protected $conn;
public function __construct() {
$this->logger = QueueLogger::getInstance();
$this->conn = SimpleQueueDB::getInstance();
}
public function setExecLimit($n) {
$this->limit = intval($n);
$this->logger->debug(sprintf('Execution limit set: %d', $n));
}
public function setRetryLimit($n) {
$this->retries = intval($n);
$this->logger->debug(sprintf('Retry limit set: %d', $n));
}
public function setRetryDelay($n) {
$this->delay = intval($n);
$this->logger->debug(sprintf('Retry delay set: %d minutes', $n));
}
public function setCallbackAction($fn, $context = 'root') {
$this->callbacks[$context] = $fn;
$this->logger->debug(sprintf('Setting callback for context [%s] to [%s]', $context, $fn));
}
public function queue($data, $context = NULL) {
$this->logger->info('QUEUING...');
$q = sprintf("INSERT INTO %s (context, request) VALUES (%s, '%s')",
$this->table,
is_null($context) ? 'NULL' : "'" . mysql_real_escape_string($context) . "'",
mysql_real_escape_string($data)
);
if (FALSE !== ($sql = mysql_query($q, $this->conn))) {
$this->logger->info(sprintf('Successfully queued data: [%s] with ID [%d]',
$data,
mysql_insert_id()
));
return TRUE;
} else {
$this->logger->debugQuery($q);
$this->logger->error(sprintf('Failed to queue data: [%s]', $data));
}
}
public function dequeue($context = NULL) {
$this->logger->info('DEQUEUING...');
$q = sprintf("SELECT id, queued, context, request, attempts, CURRENT_TIMESTAMP as ctime, CURRENT_TIMESTAMP + INTERVAL %d minute AS ntime FROM %s WHERE queued < CURRENT_TIMESTAMP AND %s ORDER BY queued",
$this->delay,
$this->table,
(NULL === $context) ? 'context IS NULL' : sprintf("context = '%s'", mysql_real_escape_string($context))
);
if ($this->limit !== 0) {
$q .= sprintf(" LIMIT %d", $this->limit);
}
if (FALSE === ($sql = mysql_query($q, $this->conn))) {
$this->logger->debugQuery($q);
$this->logger->error(sprintf('Could not retrieve queue for context [%s]', $context));
return FALSE;
}
$context = NULL === $context ? 'root' : $context;
if (!isset($this->callbacks[$context])) {
$this->logger->fatal(sprintf('Cannot process context [%s] because there is no callback defined', $context));
return FALSE;
}
$c = mysql_num_rows($sql);
if ($c > 0) {
$this->logger->debug(sprintf('Found %d items to dequeue', $c));
while ($item = mysql_fetch_assoc($sql)) {
$this->logger->debug(sprintf('Calling [%s] on data [%s]', $this->callbacks[$context], $item['request']));
// Process the user callback on each item and remove from the queue if successful
if (TRUE === call_user_func($this->callbacks[$context], $item['request'])) {
$this->logger->info(sprintf('Successfully dequeued request [%d::%s].', $item['id'], $item['request']));
$this->deleteItem($item['id']);
} else {
$this->logger->info(sprintf('Callback process failed. Requeuing request [%d::%s]', $item['id'], $item['request']));
$this->requeue($item);
}
}
} else {
$this->logger->debug(sprintf('No items to dequeue'));
}
}
public function purgeQueue() {
$q = sprintf("DELETE FROM %s WHERE attempts >= %d", $this->table, $this->retries);
if (FALSE === ($sql = @mysql_query($q, $this->conn))) {
$this->logger->debugQuery($q);
$this->logger->error('Could not purge queue');
} else {
$this->logger->info(sprintf('Queue purged successfully: deleted %d expired items', mysql_affected_rows()));
}
}
protected function requeue($item) {
$attempts = intval($item['attempts']) + 1;
$q = sprintf("UPDATE %s SET queued = CURRENT_TIMESTAMP + INTERVAL %d MINUTE, attempts = %d WHERE id = %d LIMIT 1",
$this->table,
$this->delay,
$attempts,
intval($item['id'])
);
if (FALSE === @mysql_query($q, $this->conn)) {
$this->logger->debugQuery($q);
$this->logger->error(sprintf('Could not requeue item ID [%d]', $item['id']));
} else {
$this->logger->debug(sprintf('Item [%d] requeued successfully. Item has been attempted %d time(s)', $item['id'], $attempts));
}
}
protected function deleteItem($id) {
$q = sprintf("DELETE FROM %s WHERE id = %d LIMIT 1", $this->table, intval($id));
if (FALSE === mysql_query($q, $this->conn)) {
$this->logger->debugQuery($q);
$this->logger->error(sprintf("Could not delete queue ID [%d]", intval($id)));
} else {
$this->logger->debug(sprintf('Successfully deleted ID [%d] from the queue', intval($id)));
}
}
}
?>