-
Notifications
You must be signed in to change notification settings - Fork 14
/
Calculator.py
432 lines (392 loc) · 17.3 KB
/
Calculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
import time
from utils import SteppedAvgLookup
from DataManager import DataManager
class Calculator(object):
"""A Calculator specifically designed to do stock market related
calculations on stock data.
For example, indicator data series can be calculated here for
charting purposes.
NOTE: To add an indicator, write two getter methods, one for a
dictionary and another for a series, named
'get_<indicator-name>' and 'get_<indicator-name>_series'. Then,
in the get_indicator method, add the two methods to the correct
mapping. Currently indicator getter functions need at least one
argument, even if a None will be passed.
Currently supports:
- Standard Moving Average for a given period
- Exponential Moving Average for a given period
- Moving Average Convergence/Divergence for a given set of
periods
- generating theoretical ETF data
- Previous High (i.e. the highest price the stock has been,
including the current day)
"""
def __init__(self):
"""Initializes a Calculator."""
def get_indicator(self, indicator_code, price_lut, series=False):
"""A mapping function for indicator functions. Primarily used
for cases where indicators are dynamic and hardcoding functions
is impractical.
Args:
indicator_code: A string coding the indicator and period
price_lut: A price lookup table for the data on which the
indicator should be applied
series: A value for whether or not to map to a series
indicator function
Returns:
A dictionary mapping dates to indicator values
"""
# decode
code_parts = indicator_code.split('_')
indicator = code_parts[0]
if len(code_parts) == 1:
period = None
else:
period = code_parts[1].split('-')
if len(period) == 1:
period = period[0]
# create mapping to methods
if series:
mapping = {
'SMA': self.get_sma_series,
'EMA': self.get_ema_series,
'MACD': self.get_macd_series,
'MACDSIGNAL': self.get_macd_series,
'PREVHIGH': self.get_prev_high_series
}
else:
mapping = {
'SMA': self.get_sma,
'EMA': self.get_ema,
'MACD': self.get_macd,
'MACDSIGNAL': self.get_macd_signal,
'PREVHIGH': self.get_prev_high
}
# call correct method
return mapping[indicator](period, price_lut)
def get_sma(self, period, price_lut):
"""Calculates the Standard Moving Average for a given period
and returns a dictionary of SMA values.
Args:
period: A value representing a number of days
price_lut: A price LUT, i.e. a dictionary mapping dates to
prices
Returns:
A dictionary with dates mapping to SMA values
"""
sma = {}
period = int(period)
dates = sorted(price_lut.keys())
prices = [] # keep track of all prices as we go, for performance
for i, date in enumerate(dates):
prices.append(price_lut[date])
if i < period:
sma[date] = sum(prices) / len(prices)
else:
sma[date] = sum(prices[-period:]) / period
return sma
def get_sma_series(self, period, price_lut):
"""Calculates the Standard Moving Average for a given period
and returns a list of SMA values.
Args:
period: A value representing a number of days
price_lut: A price LUT, i.e. a dictionary mapping dates to
prices
Returns:
A list with SMA values corresponding to ordered dates in
the provided price LUT
"""
sma = []
period = int(period)
dates = sorted(price_lut.keys())
prices = [] # keep track of all prices as we go, for performance
for i, date in enumerate(dates):
prices.append(price_lut[date])
if i < period:
sma.append(sum(prices) / len(prices))
else:
sma.append(sum(prices[-period:]) / period)
return sma
def get_ema(self, period, price_lut):
"""Calculates the Exponential Moving Average for a given
period and returns a dictionary of EMA values.
Args:
period: A value representing a number of days
price_lut: A price LUT, i.e. a dictionary mapping dates to
prices
Returns:
A dictionary with dates mapping to EMA values
"""
ema = {}
period = int(period)
dates = sorted(price_lut.keys())
ema = self.get_sma(period, {d: price_lut[d] for d in dates[0:period]})
multiplier = 2 / (period + 1) # used in EMA calulations
for i, date in enumerate(dates[period:]):
ema[date] = (float(price_lut[date]) * multiplier
+ ema[dates[period + i - 1]] * (1 - multiplier))
return ema
def get_ema_series(self, period, price_lut):
"""Calculates the Exponential Moving Average for a given
period and returns a list of EMA values.
Args:
period: A value representing a number of days
price_lut: A price LUT, i.e. a dictionary mapping dates to
prices
Returns:
A list with EMA values corresponding to the ordered dates
in the provided price LUT
"""
ema = []
period = int(period)
dates = sorted(price_lut.keys())
ema = self.get_sma_series(period,
{d: price_lut[d] for d in dates[0:period]})
multiplier = 2 / (period + 1) # used in EMA calulations
for i, date in enumerate(dates[period:]):
ema.append(float(price_lut[date]) * multiplier
+ ema[-1] * (1 - multiplier))
return ema
def get_macd(self, periods, price_lut):
"""Calculates the Moving Average Convergence/Divergence for a
given period and returns a dictionary mapping dates to MACD.
Args:
period: A set of values representing the days for each
MACD period, i.e. [short, long, exponential/signal]
price_lut: A set of values on which to perform the MACD
calculations
Returns:
A dictionary mapping dates to MACD values
"""
# ret = {}
macd = {}
# signal = {}
# histogram = {}
dates = sorted(price_lut.keys())
macd_short = self.get_ema(periods[0], price_lut)
macd_long = self.get_ema(periods[1], price_lut)
# calculate MACD first - needed for signal and histogram
for date in dates:
macd[date] = macd_short[date] - macd_long[date]
return macd
# calculate signal - needed for histogram
# signal = self.get_ema(periods[2], macd)
# # calculate histogram
# for date in dates:
# histogram[date] = macd[date] - signal[date]
# # now convert everything to return format
# for date in dates:
# ret[date] = [macd[date], signal[date], histogram[date]]
# return ret
def get_macd_signal(self, periods, price_lut):
"""Calculates the signal line for the Moving Average
Convergence/Divergence for a given set of periods and returns a
dictionary mapping dates to signal line values.
Args:
period: A set of values representing the days for each
MACD period, i.e. [short, long, exponential/signal]
price_lut: A set of values on which to perform the MACD
calculations
Returns:
A dictionary mapping dates to MACD signal values
"""
macd = {}
dates = sorted(price_lut.keys())
macd_short = self.get_ema(periods[0], price_lut)
macd_long = self.get_ema(periods[1], price_lut)
# calculate MACD first - needed for signal
for date in dates:
macd[date] = macd_short[date] - macd_long[date]
return self.get_ema(periods[2], macd)
def get_macd_series(self, periods, price_lut):
"""Calculates the Moving Average Convergence/Divergence for a
given period and returns lists for MACD, signal, and histogram.
Args:
period: A set of values representing the days for each
MACD period
price_lut: A set of values on which to perform the MACD
calculations
Returns:
A set of sets of values for the MACD, signal line, and MACD
histogram at each point for the given values, i.e. a set in
the form [[MACD], [signal line], [MACD histogram]]
"""
macd = {}
signal = []
histogram = []
dates = sorted(price_lut.keys())
macd_short = self.get_ema(periods[0], price_lut)
macd_long = self.get_ema(periods[1], price_lut)
# calculate MACD first - needed for signal and histogram
for date in dates:
macd[date] = macd_short[date] - macd_long[date]
# calculate signal - needed for histogram
signal = self.get_ema_series(periods[2], macd)
# calculate histogram
for i, date in enumerate(dates):
histogram.append(macd[date] - signal[i])
return [[macd[d] for d in dates], signal, histogram]
def get_prev_high(self, period, price_lut):
"""Calculates the previous high value for every point in the
given LUT.
Args:
period: A placeholder, just pass None for now
price_lut: A set f values on which to perform the previous
high calculation
Returns:
A dictionary of dates mapping to values
"""
prev_high = {}
dates = sorted(price_lut.keys())
prev_high[dates[0]] = price_lut[dates[0]]
for i, date in enumerate(dates[1:]):
prev_high[date] = max(price_lut[date], prev_high[dates[i]])
return prev_high
def get_prev_high_series(self, period, price_lut):
"""Calculates the previous high value for every point in the
given LUT.
Args:
period: A placeholder, just pass None for now
price_lut: A set f values on which to perform the previous
high calculation
Returns:
A dictionary of dates mapping to values
"""
prev_high = []
dates = sorted(price_lut.keys())
prev_high.append(price_lut[dates[0]])
for date in dates[1:]:
prev_high.append(max(price_lut[date], prev_high[-1]))
return prev_high
def generate_theoretical_data(self, ticker_tgt, ticker_src,
step=0.00005, pos_adj=None, neg_adj=None):
"""Generates theoretical data for a stock based on another
stock.
Given two tickers, a granularity/precision step, and manual
offset/adjustments, generates more data for the first stock
(gen) to match the length of data in the second stock (src).
The generation is based on averages in existing real data and
assumes an existing correlation between two stocks (e.g. UPRO
and SPY supposedly have a correlation, or leverage factor of 3)
Args:
ticker_tgt: A ticker of the stock for which data should be
generated, i.e. the target for the generation
ticker_src: A ticker of the stock to be used as the data
source to aid in data generation.
NOTE: This implies the source data should be longer
than the data for the stock for which the generation
occurs
step: A value corresponding to a level of precision, or the
number of averages calculated and then used to generate
the data. NOTE: precision != accuracy and a default
value of 0.00005 is used if one is not given, based on
testing done on different values
pos_adj: A value to be used when adjusting movements in the
positive direction, i.e. a higher value will lead to
more pronounced positive moves (default: None, if None
a hardcoded default value will be used depending on
the ticker, typically 0)
neg_adj: A value to be used when adjusting movements in the
negative direction, i.e. a higher value will lead to
more pronounced negative moves (default: None, if None
a hardcoded default value will be used depending on
the ticker, typically 0)
Returns:
A tuple of price LUTs, one LUT containing real data
appended to a part of the generated data, the other
containing a full set of generated data. The former is
intended to be used in backtesting strategies, while the
latter is intended to be used for verifying generation
accuracy against existing real data.
"""
db = DataManager()
# get prices for tickers
price_lut_tgt = db.build_price_lut(ticker_tgt)
price_lut_src = db.build_price_lut(ticker_src)
# before doing any calculations, check if all data is on disk already
# NOTE: feature disabled for now, as it didnt respond to changes
# price_lut_gen_part = db.build_price_lut(ticker_tgt + '--GEN-PART')
# price_lut_gen_full = db.build_price_lut(ticker_tgt + '--GEN-FULL')
# if (len(price_lut_gen_part) == len(price_lut_src)
# and len(price_lut_gen_full) == len(price_lut_src)):
# return (price_lut_gen_part, price_lut_gen_full)
# sorted dates needed later
src_dates = sorted(price_lut_src.keys())
gen_dates = sorted(price_lut_tgt.keys())
# part of data will be real data
price_lut_gen_part = price_lut_tgt.copy()
# fully generated data needs a real point as an anchor
price_lut_gen_full = {gen_dates[0]: price_lut_tgt[gen_dates[0]]}
# a set of adjustments to use if not otherwise specified
adjustments = {
'UPRO': (0, 0),
'TMF': (0.01, 0.05),
'TQQQ': (0.025, 0),
'UDOW': (0, 0.01)
}
if step == 0.00005 and pos_adj is None and neg_adj is None:
try:
pos_adj = adjustments[ticker_tgt.upper()][0]
neg_adj = adjustments[ticker_tgt.upper()][1]
except KeyError:
pos_adj = 0
neg_adj = 0
# calculate % movements and leverage ratio, to use for the SA-LUT
moves = {}
ratios = {}
for i in range(len(gen_dates) - 1):
change_src = (price_lut_src[gen_dates[i + 1]]
/ price_lut_src[gen_dates[i]]
- 1)
change_gen = (price_lut_tgt[gen_dates[i + 1]]
/ price_lut_tgt[gen_dates[i]]
- 1)
moves[gen_dates[i + 1]] = change_src
if change_src == 0:
ratios[gen_dates[i + 1]] = 0.0
else:
ratios[gen_dates[i + 1]] = change_gen / change_src
sa_lut = SteppedAvgLookup(step,
[moves[d] for d in gen_dates[1:]],
[ratios[d] for d in gen_dates[1:]])
# generate data going forward from gen data's anchor point
for i in range(len(gen_dates) - 1):
move = moves[gen_dates[i + 1]]
if move >= 0:
adj = pos_adj
else:
adj = neg_adj
price_lut_gen_full[gen_dates[i + 1]] = \
(price_lut_gen_full[gen_dates[i]]
* (move * (sa_lut.get(move) + adj) + 1))
# generate data going backwards from gen data's anchor point
for i in range(len(src_dates) - len(gen_dates) - 1, -1, -1):
move = (price_lut_src[src_dates[i + 1]]
/ price_lut_src[src_dates[i]]
- 1)
if move >= 0:
adj = pos_adj
else:
adj = neg_adj
gen_price = (price_lut_gen_full[src_dates[i + 1]]
/ (move * (sa_lut.get(move) + adj) + 1))
price_lut_gen_full[src_dates[i]] = gen_price
price_lut_gen_part[src_dates[i]] = gen_price
# save data to disk for faster retrieval next time
db.write_stock_data(ticker_tgt + '--GEN-FULL',
[[date,
'-',
'-',
'-',
str(price_lut_gen_full[date]),
'-'] for date in src_dates],
False)
db.write_stock_data(ticker_tgt + '--GEN-PART',
[[date,
'-',
'-',
'-',
str(price_lut_gen_part[date]),
'-'] for date in src_dates],
False)
return (price_lut_gen_part, price_lut_gen_full)