-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnumpy_ringbuffer.py
199 lines (168 loc) · 6.39 KB
/
numpy_ringbuffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""
Based on the work in https://github.com/eric-wieser/numpy_ringbuffer/blob/master/numpy_ringbuffer/__init__.py
"""
from collections.abc import Sequence
import numpy as np
class NP_Ring_Buffer(Sequence):
def __init__(self, capacity, dtype=float, allow_overwrite=True):
"""
Create a new ring buffer with the given capacity and element type
Parameters
----------
capacity: int
The maximum capacity of the ring buffer
dtype: data-type, optional
Desired type of buffer elements. Use a type like (float, 2) to
produce a buffer with shape (N, 2)
allow_overwrite: bool
If false, throw an IndexError when trying to append to an alread
full buffer
"""
self._arr = np.empty(capacity, dtype)
self._left_index = 0
self._right_index = 0
self._capacity = capacity
self._allow_overwrite = allow_overwrite
def _unwrap(self):
""" Copy the data from this buffer into unwrapped form """
return np.concatenate((
self._arr[self._left_index:min(self._right_index, self._capacity)],
self._arr[:max(self._right_index - self._capacity, 0)]
))
def _fix_indices(self):
"""
Enforce our invariant that 0 <= self._left_index < self._capacity
"""
if self._left_index >= self._capacity:
self._left_index -= self._capacity
self._right_index -= self._capacity
elif self._left_index < 0:
self._left_index += self._capacity
self._right_index += self._capacity
@property
def is_full(self):
""" True if there is no more space in the buffer """
return len(self) == self._capacity
# numpy compatibility
def __array__(self):
return self._unwrap()
@property
def dtype(self):
return self._arr.dtype
@property
def shape(self):
return (len(self),) + self._arr.shape[1:]
# these mirror methods from deque
@property
def maxlen(self):
return self._capacity
def append(self, value):
if self.is_full:
if not self._allow_overwrite:
raise IndexError('append to a full RingBuffer with overwrite disabled')
elif not len(self):
return
else:
self._left_index += 1
self._arr[self._right_index % self._capacity] = value
self._right_index += 1
self._fix_indices()
def appendleft(self, value):
if self.is_full:
if not self._allow_overwrite:
raise IndexError('append to a full RingBuffer with overwrite disabled')
elif not len(self):
return
else:
self._right_index -= 1
self._left_index -= 1
self._fix_indices()
self._arr[self._left_index] = value
def pop(self):
if len(self) == 0:
raise IndexError("pop from an empty RingBuffer")
self._right_index -= 1
self._fix_indices()
res = self._arr[self._right_index % self._capacity]
return res
def popleft(self):
if len(self) == 0:
raise IndexError("pop from an empty RingBuffer")
res = self._arr[self._left_index]
self._left_index += 1
self._fix_indices()
return res
def extend(self, values):
lv = len(values)
if len(self) + lv > self._capacity:
if not self._allow_overwrite:
raise IndexError('extend a RingBuffer such that it would overflow, with overwrite disabled')
elif not len(self):
return
if lv >= self._capacity:
# wipe the entire array! - this may not be threadsafe
self._arr[...] = values[-self._capacity:]
self._right_index = self._capacity
self._left_index = 0
return
ri = self._right_index % self._capacity
sl1 = np.s_[ri:min(ri + lv, self._capacity)]
sl2 = np.s_[:max(ri + lv - self._capacity, 0)]
self._arr[sl1] = values[:sl1.stop - sl1.start]
self._arr[sl2] = values[sl1.stop - sl1.start:]
self._right_index += lv
self._left_index = max(self._left_index, self._right_index - self._capacity)
self._fix_indices()
def extendleft(self, values):
lv = len(values)
if len(self) + lv > self._capacity:
if not self._allow_overwrite:
raise IndexError('extend a RingBuffer such that it would overflow, with overwrite disabled')
elif not len(self):
return
if lv >= self._capacity:
# wipe the entire array! - this may not be threadsafe
self._arr[...] = values[:self._capacity]
self._right_index = self._capacity
self._left_index = 0
return
self._left_index -= lv
self._fix_indices()
li = self._left_index
sl1 = np.s_[li:min(li + lv, self._capacity)]
sl2 = np.s_[:max(li + lv - self._capacity, 0)]
self._arr[sl1] = values[:sl1.stop - sl1.start]
self._arr[sl2] = values[sl1.stop - sl1.start:]
self._right_index = min(self._right_index, self._left_index + self._capacity)
# implement Sequence methods
def __len__(self):
return self._right_index - self._left_index
def __getitem__(self, item):
# handle simple (b[1]) and basic (b[np.array([1, 2, 3])]) fancy indexing specially
if not isinstance(item, tuple):
item_arr = np.asarray(item)
if issubclass(item_arr.dtype.type, np.integer):
item_arr = (item_arr + self._left_index) % self._capacity
return self._arr[item_arr]
# for everything else, get it right at the expense of efficiency
return self._unwrap()[item]
def __iter__(self):
# alarmingly, this is comparable in speed to using itertools.chain
return iter(self._unwrap())
# Everything else
def __repr__(self):
return '<RingBuffer of {!r}>'.format(np.asarray(self))
def mean(self):
x=self._unwrap()
return x.mean(axis=0)
if __name__ == "__main__":
x = NP_Ring_Buffer(7, (float, 3))
data = np.arange(9).reshape([3,3])
data2 = np.arange(3)+100
x.extend(data)
x.append(data2)
x.extend(data)
x.extend(-data)
print(x)
print(np.array(x))
print(x.mean())