From 4c5a7c44bb1b12dd3724a50569b4d9c45b85ca20 Mon Sep 17 00:00:00 2001 From: Dan Cech Date: Wed, 6 Dec 2017 11:41:23 -0500 Subject: [PATCH] Tag federation & py3 fixes (#2128) * support passing seriesByTag calls to finders that support it * remove unused noPrefetch arg * switch from StringIO to BytesIO --- webapp/graphite/finders/remote.py | 68 ++++++ webapp/graphite/finders/utils.py | 8 + webapp/graphite/render/datalib.py | 4 +- webapp/graphite/render/evaluator.py | 22 +- webapp/graphite/render/functions.py | 23 +- webapp/graphite/render/glyph.py | 23 +- webapp/graphite/render/grammar.py | 11 +- webapp/graphite/render/views.py | 6 +- webapp/graphite/storage.py | 194 ++++++++++++++- webapp/graphite/tags/views.py | 4 +- webapp/graphite/util.py | 8 +- webapp/tests/test_finders_remote.py | 170 +++++++++++++- webapp/tests/test_functions.py | 116 +++------ webapp/tests/test_readers_remote.py | 12 +- webapp/tests/test_render.py | 15 ++ webapp/tests/test_storage.py | 353 ++++++++++++++++++++++++++-- 16 files changed, 849 insertions(+), 188 deletions(-) diff --git a/webapp/graphite/finders/remote.py b/webapp/graphite/finders/remote.py index 600265e15..b29d43359 100644 --- a/webapp/graphite/finders/remote.py +++ b/webapp/graphite/finders/remote.py @@ -49,6 +49,7 @@ def __init__(self, host): self.url = parsed['url'] self.params = parsed['params'] self.last_failure = 0 + self.tags = not self.params.get('noTags') @property def disabled(self): @@ -178,6 +179,73 @@ def get_index(self, requestContext): return results + def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None): + """ + Return auto-complete suggestions for tags based on the matches for the specified expressions, optionally filtered by tag prefix + """ + if limit is None: + limit = settings.TAGDB_AUTOCOMPLETE_LIMIT + + fields = [ + ('tagPrefix', tagPrefix or ''), + ('limit', str(limit)), + ] + for expr in exprs: + fields.append(('expr', expr)) + + result = self.request( + '/tags/autoComplete/tags', + fields, + headers=requestContext.get('forwardHeaders') if requestContext else None, + timeout=settings.REMOTE_FIND_TIMEOUT) + try: + reader = codecs.getreader('utf-8') + results = json.load(reader(result)) + except Exception as err: + self.fail() + log.exception( + "RemoteFinder[%s] Error decoding autocomplete tags response from %s: %s" % + (self.host, result.url_full, err)) + raise Exception("Error decoding autocomplete tags response from %s: %s" % (result.url_full, err)) + finally: + result.release_conn() + + return results + + def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None): + """ + Return auto-complete suggestions for tags and values based on the matches for the specified expressions, optionally filtered by tag and/or value prefix + """ + if limit is None: + limit = settings.TAGDB_AUTOCOMPLETE_LIMIT + + fields = [ + ('tag', tag or ''), + ('valuePrefix', valuePrefix or ''), + ('limit', str(limit)), + ] + for expr in exprs: + fields.append(('expr', expr)) + + result = self.request( + '/tags/autoComplete/values', + fields, + headers=requestContext.get('forwardHeaders') if requestContext else None, + timeout=settings.REMOTE_FIND_TIMEOUT) + try: + reader = codecs.getreader('utf-8') + results = json.load(reader(result)) + except Exception as err: + self.fail() + log.exception( + "RemoteFinder[%s] Error decoding autocomplete values response from %s: %s" % + (self.host, result.url_full, err)) + raise Exception("Error decoding autocomplete values response from %s: %s" % (result.url_full, err)) + finally: + result.release_conn() + + return results + def request(self, path, fields=None, headers=None, timeout=None): url = "%s%s" % (self.url, path) url_full = "%s?%s" % (url, urlencode(fields)) diff --git a/webapp/graphite/finders/utils.py b/webapp/graphite/finders/utils.py index 623a5dc6d..9ce719d4a 100644 --- a/webapp/graphite/finders/utils.py +++ b/webapp/graphite/finders/utils.py @@ -43,6 +43,8 @@ class BaseFinder(object): local = True # set to True if this finder shouldn't be used disabled = False + # set to True if this finder supports seriesByTag + tags = False def __init__(self): """Initialize the finder.""" @@ -147,3 +149,9 @@ def fetch(self, patterns, start_time, end_time, now=None, requestContext=None): }) return result + + def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None): + return [] + + def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None): + return [] diff --git a/webapp/graphite/render/datalib.py b/webapp/graphite/render/datalib.py index 6c44a8833..0b5602ea0 100755 --- a/webapp/graphite/render/datalib.py +++ b/webapp/graphite/render/datalib.py @@ -25,7 +25,7 @@ class TimeSeries(list): - def __init__(self, name, start, end, step, values, consolidate='average', tags=None, xFilesFactor=None): + def __init__(self, name, start, end, step, values, consolidate='average', tags=None, xFilesFactor=None, pathExpression=None): list.__init__(self, values) self.name = name self.start = start @@ -34,7 +34,7 @@ def __init__(self, name, start, end, step, values, consolidate='average', tags=N self.consolidationFunc = consolidate self.valuesPerPoint = 1 self.options = {} - self.pathExpression = name + self.pathExpression = pathExpression or name self.xFilesFactor = xFilesFactor if xFilesFactor is not None else settings.DEFAULT_XFILES_FACTOR if tags: diff --git a/webapp/graphite/render/evaluator.py b/webapp/graphite/render/evaluator.py index cd0722258..b41feaa2c 100644 --- a/webapp/graphite/render/evaluator.py +++ b/webapp/graphite/render/evaluator.py @@ -1,18 +1,16 @@ import re +import six from graphite.render.grammar import grammar from graphite.render.datalib import fetchData, TimeSeries, prefetchData -from graphite.storage import STORE -import six -def evaluateTarget(requestContext, targets, noPrefetch=False): +def evaluateTarget(requestContext, targets): if not isinstance(targets, list): targets = [targets] - if not noPrefetch: - pathExpressions = extractPathExpressions(requestContext, targets) - prefetchData(requestContext, pathExpressions) + pathExpressions = extractPathExpressions(requestContext, targets) + prefetchData(requestContext, pathExpressions) seriesList = [] @@ -78,6 +76,9 @@ def evaluateTokens(requestContext, tokens, replacements=None, pipedArg=None): # as tokens.template. this generally happens if you try to pass non-numeric/string args raise ValueError("invalid template() syntax, only string/numeric arguments are allowed") + if tokens.call.funcname == 'seriesByTag': + return fetchData(requestContext, tokens.call.raw) + func = SeriesFunctions[tokens.call.funcname] rawArgs = tokens.call.args or [] if pipedArg is not None: @@ -145,14 +146,9 @@ def extractPathExpression(requestContext, tokens, replacements=None): expression = expression.replace('$'+name, str(replacements[name])) pathExpressions.add(expression) elif tokens.call: - # if we're prefetching seriesByTag, look up the matching series and prefetch those + # if we're prefetching seriesByTag, pass the entire call back as a path expression if tokens.call.funcname == 'seriesByTag': - if STORE.tagdb: - for series in STORE.tagdb.find_series( - tuple([t.string[1:-1] for t in tokens.call.args if t.string]), - requestContext=requestContext, - ): - pathExpressions.add(series) + pathExpressions.add(tokens.call.raw) else: for a in tokens.call.args: extractPathExpression(requestContext, a, replacements) diff --git a/webapp/graphite/render/functions.py b/webapp/graphite/render/functions.py index a77ad32fd..7487cc535 100644 --- a/webapp/graphite/render/functions.py +++ b/webapp/graphite/render/functions.py @@ -4303,28 +4303,7 @@ def seriesByTag(requestContext, *tagExpressions): See :ref:`querying tagged series ` for more detail. """ - - if STORE.tagdb is None: - log.info('seriesByTag called but no TagDB configured') - return [] - - taggedSeries = STORE.tagdb.find_series(tagExpressions, requestContext=requestContext) - if not taggedSeries: - return [] - - taggedSeriesQuery = 'group(' + ','.join(taggedSeries) + ')' - - log.debug('taggedSeriesQuery %s' % taggedSeriesQuery) - - seriesList = evaluateTarget(requestContext, taggedSeriesQuery, noPrefetch=True) - - pathExpr = 'seriesByTag(%s)' % ','.join(['"%s"' % expr for expr in tagExpressions]) - for series in seriesList: - series.pathExpression = pathExpr - - log.debug('seriesByTag found [%s]' % ', '.join([series.name for series in seriesList])) - - return seriesList + # the handling of seriesByTag is implemented in STORE.fetch def groupByTags(requestContext, seriesList, callback, *tags): """ diff --git a/webapp/graphite/render/glyph.py b/webapp/graphite/render/glyph.py index 4b45f3ad6..4985cca7b 100644 --- a/webapp/graphite/render/glyph.py +++ b/webapp/graphite/render/glyph.py @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations under the License.""" -import math, itertools, re, sys +import math, itertools, re from datetime import datetime, timedelta from six.moves import range, zip from six.moves.urllib.parse import unquote_plus @@ -21,20 +21,13 @@ import pytz from graphite.render.datalib import TimeSeries -from graphite.util import json +from graphite.util import json, BytesIO try: import cairocffi as cairo except ImportError: import cairo -# BytesIO is needed on py3 as StringIO does not operate on byte input anymore -# We could use BytesIO on py2 as well but it is slower than StringIO -if sys.version_info >= (3, 0): - from io import BytesIO as StringIO -else: - from cStringIO import StringIO - INFINITY = float('inf') colorAliases = { @@ -585,10 +578,10 @@ def setupCairo(self,outputFormat='png'): if outputFormat == 'png': self.surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, self.width, self.height) elif outputFormat == 'svg': - self.surfaceData = StringIO.StringIO() + self.surfaceData = BytesIO() self.surface = cairo.SVGSurface(self.surfaceData, self.width, self.height) elif outputFormat == 'pdf': - self.surfaceData = StringIO.StringIO() + self.surfaceData = BytesIO() self.surface = cairo.PDFSurface(self.surfaceData, self.width, self.height) res_x, res_y = self.surface.get_fallback_resolution() self.width = float(self.width / res_x) * 72 @@ -889,7 +882,7 @@ def output(self, fileObj): metaData = { } self.surface.finish() - svgData = self.surfaceData.getvalue() + svgData = str(self.surfaceData.getvalue()) self.surfaceData.close() svgData = svgData.replace('pt"', 'px"', 2) # we expect height/width in pixels, not points @@ -912,13 +905,13 @@ def onHeaderPath(match): svgData += "" svgData = svgData.replace(' data-header="true"','') - fileObj.write(svgData) - fileObj.write(""" -""" % json.dumps(metaData)) +""" % json.dumps(metaData)).encode('utf-8')) class LineGraph(Graph): diff --git a/webapp/graphite/render/grammar.py b/webapp/graphite/render/grammar.py index 3f56374b1..cc931a6cf 100644 --- a/webapp/graphite/render/grammar.py +++ b/webapp/graphite/render/grammar.py @@ -2,7 +2,7 @@ Forward, Combine, Optional, Word, Literal, CaselessKeyword, CaselessLiteral, Group, FollowedBy, LineEnd, OneOrMore, ZeroOrMore, alphas, alphanums, printables, delimitedList, quotedString, Regex, - __version__, Suppress + __version__, Suppress, Empty ) grammar = Forward() @@ -71,14 +71,19 @@ args = delimitedList(~kwarg + arg) # lookahead to prevent failing on equals kwargs = delimitedList(kwarg) +def setRaw(s, loc, toks): + toks[0].raw = s[toks[0].start:toks[0].end] + call = Group( + Empty().setParseAction(lambda s, l, t: l)('start') + funcname + leftParen + Optional( args + Optional( comma + kwargs ) - ) + rightParen -)('call') + ) + rightParen + + Empty().leaveWhitespace().setParseAction(lambda s, l, t: l)('end') +).setParseAction(setRaw)('call') # Metric pattern (aka. pathExpression) validMetricChars = ''.join((set(printables) - set(symbols))) diff --git a/webapp/graphite/render/views.py b/webapp/graphite/render/views.py index 237f626d4..4bd42249e 100755 --- a/webapp/graphite/render/views.py +++ b/webapp/graphite/render/views.py @@ -24,7 +24,7 @@ from graphite.compat import HttpResponse from graphite.user_util import getProfileByUsername -from graphite.util import json, unpickle, pickle, msgpack, StringIO +from graphite.util import json, unpickle, pickle, msgpack, BytesIO from graphite.storage import extractForwardHeaders from graphite.logger import log from graphite.render.evaluator import evaluateTarget @@ -515,7 +515,7 @@ def delegateRendering(graphType, graphOptions, headers=None): def renderLocalView(request): try: start = time() - reqParams = StringIO(request.body) + reqParams = BytesIO(request.body) graphType = reqParams.readline().strip() optionsPickle = reqParams.read() reqParams.close() @@ -569,7 +569,7 @@ def renderMyGraphView(request,username,graphName): def doImageRender(graphClass, graphOptions): - pngData = StringIO() + pngData = BytesIO() t = time() img = graphClass(**graphOptions) img.output(pngData) diff --git a/webapp/graphite/storage.py b/webapp/graphite/storage.py index 323e3fc73..d48b468be 100755 --- a/webapp/graphite/storage.py +++ b/webapp/graphite/storage.py @@ -5,6 +5,7 @@ import types from collections import defaultdict +from copy import deepcopy from django.conf import settings from django.core.cache import cache @@ -21,6 +22,7 @@ from graphite.finders.utils import FindQuery, BaseFinder from graphite.readers import MultiReader from graphite.worker_pool.pool import get_pool, pool_exec, Job, PoolTimeoutError +from graphite.render.grammar import grammar def get_finders(finder_path): @@ -61,8 +63,8 @@ def __init__(self, finders=None, tagdb=None): self.finders = finders if tagdb is None: - tagdb = settings.TAGDB or 'graphite.tags.localdatabase.LocalDatabaseTagDB' - self.tagdb = get_tagdb(tagdb) + tagdb = get_tagdb(settings.TAGDB or 'graphite.tags.localdatabase.LocalDatabaseTagDB') + self.tagdb = tagdb def get_finders(self, local=False): for finder in self.finders: @@ -85,7 +87,7 @@ def pool_exec(self, jobs, timeout): def fetch(self, patterns, startTime, endTime, now, requestContext): # deduplicate patterns - patterns = list(set(patterns)) + patterns = sorted(set(patterns)) if not patterns: return [] @@ -93,10 +95,21 @@ def fetch(self, patterns, startTime, endTime, now, requestContext): log.debug( 'graphite.storage.Store.fetch :: Starting fetch on all backends') - jobs = [ - Job(finder.fetch, patterns, startTime, endTime, now=now, requestContext=requestContext) - for finder in self.get_finders(requestContext.get('localOnly')) - ] + jobs = [] + tag_patterns = None + pattern_aliases = defaultdict(list) + for finder in self.get_finders(requestContext.get('localOnly')): + # if the finder supports tags, just pass the patterns through + if getattr(finder, 'tags', False): + jobs.append(Job(finder.fetch, patterns, startTime, endTime, now=now, requestContext=requestContext)) + continue + + # if we haven't resolved the seriesByTag calls, build resolved patterns and translation table + if tag_patterns is None: + tag_patterns, pattern_aliases = self._tag_patterns(patterns, requestContext) + + # dispatch resolved patterns to finder + jobs.append(Job(finder.fetch, tag_patterns, startTime, endTime, now=now, requestContext=requestContext)) results = [] @@ -120,11 +133,50 @@ def fetch(self, patterns, startTime, endTime, now, requestContext): log.info("Timed out in fetch after %fs" % (time.time() - start)) if errors == done: + if errors == 1: + raise Exception("Fetch for %s failed: %s" % (str(patterns), str(job.exception))) raise Exception('All fetches failed for %s' % (str(patterns))) + # translate path expressions for responses from resolved seriesByTag patterns + for result in results: + if result['name'] == result['pathExpression'] and result['pathExpression'] in pattern_aliases: + for pathExpr in pattern_aliases[result['pathExpression']]: + newresult = deepcopy(result) + newresult['pathExpression'] = pathExpr + results.append(newresult) + log.debug("Got all fetch results for %s in %fs" % (str(patterns), time.time() - start)) return results + def _tag_patterns(self, patterns, requestContext): + tag_patterns = [] + pattern_aliases = defaultdict(list) + + for pattern in patterns: + # if pattern isn't a seriesByTag call, just add it to the list + if not pattern.startswith('seriesByTag('): + tag_patterns.append(pattern) + continue + + # perform the tagdb lookup + exprs = tuple([ + t.string[1:-1] + for t in grammar.parseString(pattern).expression.call.args + if t.string + ]) + taggedSeries = self.tagdb.find_series(exprs, requestContext=requestContext) + if not taggedSeries: + continue + + # add to translation table for path matching + for series in taggedSeries: + pattern_aliases[series].append(pattern) + + # add to list of resolved patterns + tag_patterns.extend(taggedSeries) + + return sorted(set(tag_patterns)), pattern_aliases + def get_index(self, requestContext=None): log.debug('graphite.storage.Store.get_index :: Starting get_index on all backends') @@ -158,6 +210,8 @@ def get_index(self, requestContext=None): log.info("Timed out in get_index after %fs" % (time.time() - start)) if errors == done: + if errors == 1: + raise Exception("get_index failed: %s" % (str(job.exception))) raise Exception('All index lookups failed') log.debug("Got all index results in %fs" % (time.time() - start)) @@ -222,6 +276,8 @@ def _find(self, query): log.info("Timed out in find after %fs" % (time.time() - start)) if errors == done: + if errors == 1: + raise Exception("Find for %s failed: %s" % (str(query), str(job.exception))) raise Exception('All finds failed for %s' % (str(query))) log.debug("Got all find results for %s in %fs" % (str(query), time.time() - start)) @@ -339,6 +395,130 @@ def distance_to_requested_interval(node): reader = MultiReader(minimal_node_set) return LeafNode(path, reader) + def tagdb_auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None): + log.debug( + 'graphite.storage.Store.auto_complete_tags :: Starting lookup on all backends') + + if requestContext is None: + requestContext = {} + + jobs = [] + use_tagdb = False + for finder in self.get_finders(requestContext.get('localOnly')): + if getattr(finder, 'tags', False): + jobs.append(Job(finder.auto_complete_tags, exprs, tagPrefix=tagPrefix, limit=limit, requestContext=requestContext)) + else: + use_tagdb = True + + if not jobs: + if not use_tagdb: + return [] + + return self.tagdb.auto_complete_tags(exprs, tagPrefix=tagPrefix, limit=limit, requestContext=requestContext) + + # start finder jobs + jobs = self.pool_exec(jobs, settings.REMOTE_FIND_TIMEOUT) + + results = set() + + # if we're using the local tagdb then execute it (in the main thread so that LocalDatabaseTagDB will work) + if use_tagdb: + results.update(self.tagdb.auto_complete_tags(exprs, tagPrefix=tagPrefix, limit=limit, requestContext=requestContext)) + + done = 0 + errors = 0 + + # Start fetches + start = time.time() + try: + for job in jobs: + done += 1 + + if job.exception: + errors += 1 + log.info("Autocomplete tags for %s %s failed after %fs: %s" % (str(exprs), tagPrefix or '', time.time() - start, str(job.exception))) + continue + + log.debug("Got an autocomplete result for %s %s after %fs" % (str(exprs), tagPrefix or '', time.time() - start)) + results.update(job.result) + except PoolTimeoutError: + raise Exception("Timed out in autocomplete tags for %s %s after %fs" % (str(exprs), tagPrefix or '', time.time() - start)) + + if errors == done: + if errors == 1: + raise Exception("Autocomplete tags for %s %s failed: %s" % (str(exprs), tagPrefix or '', str(job.exception))) + raise Exception('All autocomplete tag requests failed for %s %s' % (str(exprs), tagPrefix or '')) + + # sort & limit results + results = sorted(results) + if limit: + results = results[:int(limit)] + + log.debug("Got all autocomplete tag results for %s %s in %fs" % (str(exprs), tagPrefix or '', time.time() - start)) + return results + + def tagdb_auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None): + log.debug( + 'graphite.storage.Store.auto_complete_values :: Starting lookup on all backends') + + if requestContext is None: + requestContext = {} + + jobs = [] + use_tagdb = False + for finder in self.get_finders(requestContext.get('localOnly')): + if getattr(finder, 'tags', False): + jobs.append(Job(finder.auto_complete_values, exprs, tag, valuePrefix=valuePrefix, limit=limit, requestContext=requestContext)) + else: + use_tagdb = True + + if not jobs: + if not use_tagdb: + return [] + + return self.tagdb.auto_complete_values(exprs, tag, valuePrefix=valuePrefix, limit=limit, requestContext=requestContext) + + # start finder jobs + jobs = self.pool_exec(jobs, settings.REMOTE_FIND_TIMEOUT) + + results = set() + + # if we're using the local tagdb then execute it (in the main thread so that LocalDatabaseTagDB will work) + if use_tagdb: + results.update(self.tagdb.auto_complete_values(exprs, tag, valuePrefix=valuePrefix, limit=limit, requestContext=requestContext)) + + done = 0 + errors = 0 + + # Start fetches + start = time.time() + try: + for job in jobs: + done += 1 + + if job.exception: + errors += 1 + log.info("Autocomplete values for %s %s %s failed after %fs: %s" % (str(exprs), tag, valuePrefix or '', time.time() - start, str(job.exception))) + continue + + log.debug("Got an autocomplete result for %s %s %s after %fs" % (str(exprs), tag, valuePrefix or '', time.time() - start)) + results.update(job.result) + except PoolTimeoutError: + raise Exception("Timed out in autocomplete values for %s %s %s after %fs" % (str(exprs), tag, valuePrefix or '', time.time() - start)) + + if errors == done: + if errors == 1: + raise Exception("Autocomplete values for %s %s %s failed: %s" % (str(exprs), tag, valuePrefix or '', str(job.exception))) + raise Exception('All autocomplete value requests failed for %s %s %s' % (str(exprs), tag, valuePrefix or '')) + + # sort & limit results + results = sorted(results) + if limit: + results = results[:int(limit)] + + log.debug("Got all autocomplete value results for %s %s %s in %fs" % (str(exprs), tag, valuePrefix or '', time.time() - start)) + return results + def extractForwardHeaders(request): headers = {} diff --git a/webapp/graphite/tags/views.py b/webapp/graphite/tags/views.py index 72d65321b..cda817de4 100644 --- a/webapp/graphite/tags/views.py +++ b/webapp/graphite/tags/views.py @@ -153,7 +153,7 @@ def autoCompleteTags(request, queryParams): elif len(queryParams.getlist('expr[]')) > 0: exprs = queryParams.getlist('expr[]') - return STORE.tagdb.auto_complete_tags( + return STORE.tagdb_auto_complete_tags( exprs, tagPrefix=queryParams.get('tagPrefix'), limit=queryParams.get('limit'), @@ -177,7 +177,7 @@ def autoCompleteValues(request, queryParams): if not tag: raise HttpError('no tag specified', status=400) - return STORE.tagdb.auto_complete_values( + return STORE.tagdb_auto_complete_values( exprs, tag, valuePrefix=queryParams.get('valuePrefix'), diff --git a/webapp/graphite/util.py b/webapp/graphite/util.py index 19855b58f..dee2e9491 100644 --- a/webapp/graphite/util.py +++ b/webapp/graphite/util.py @@ -36,11 +36,11 @@ if sys.version_info >= (3, 0): PY3 = True import pickle - from io import BytesIO as StringIO + from io import BytesIO else: PY3 = False import cPickle as pickle - from cStringIO import StringIO + from cStringIO import StringIO as BytesIO # use https://github.com/msgpack/msgpack-python if available try: @@ -166,7 +166,7 @@ def find_class(cls, module, name): @classmethod def loads(cls, pickle_string): - pickle_obj = pickle.Unpickler(StringIO(pickle_string)) + pickle_obj = pickle.Unpickler(BytesIO(pickle_string)) pickle_obj.find_global = cls.find_class return pickle_obj.load() @@ -200,7 +200,7 @@ def find_class(self, module, name): class unpickle(object): @staticmethod def loads(pickle_string): - return SafeUnpickler(StringIO(pickle_string)).load() + return SafeUnpickler(BytesIO(pickle_string)).load() @staticmethod def load(file): diff --git a/webapp/tests/test_finders_remote.py b/webapp/tests/test_finders_remote.py index bb37ca544..ccac99db8 100644 --- a/webapp/tests/test_finders_remote.py +++ b/webapp/tests/test_finders_remote.py @@ -9,7 +9,7 @@ from graphite.finders.remote import RemoteFinder from graphite.finders.utils import FindQuery from graphite.node import BranchNode, LeafNode -from graphite.util import json, pickle, StringIO, msgpack +from graphite.util import json, pickle, BytesIO, msgpack from .base import TestCase @@ -78,7 +78,7 @@ def test_find_nodes(self, http_request): 'is_leaf': True, }, ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject query = FindQuery('a.b.c', startTime, endTime) @@ -126,7 +126,7 @@ def test_find_nodes(self, http_request): }, ] responseObject = HTTPResponse( - body=StringIO(msgpack.dumps(data)), + body=BytesIO(msgpack.dumps(data)), status=200, preload_content=False, headers={'Content-Type': 'application/x-msgpack'} @@ -164,7 +164,7 @@ def test_find_nodes(self, http_request): self.assertEqual(nodes[1].path, 'a.b.c.d') # non-pickle response - responseObject = HTTPResponse(body=StringIO(b'error'), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(b'error'), status=200, preload_content=False) http_request.return_value = responseObject result = finder.find_nodes(query) @@ -236,7 +236,7 @@ def test_RemoteFinder_fetch(self, http_request): 'name': 'a.b.c.d', }, ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject result = finder.fetch(['a.b.c.d'], startTime, endTime) @@ -278,7 +278,7 @@ def test_get_index(self, http_request): 'a.b.c', 'a.b.c.d', ] - responseObject = HTTPResponse(body=StringIO(json.dumps(data).encode('ascii')), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(json.dumps(data).encode('utf-8')), status=200, preload_content=False) http_request.return_value = responseObject result = finder.get_index({}) @@ -304,8 +304,164 @@ def test_get_index(self, http_request): self.assertEqual(result[1], 'a.b.c.d') # non-json response - responseObject = HTTPResponse(body=StringIO(b'error'), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(b'error'), status=200, preload_content=False) http_request.return_value = responseObject with self.assertRaisesRegexp(Exception, 'Error decoding index response from http://[^ ]+: .+'): result = finder.get_index({}) + + @patch('urllib3.PoolManager.request') + @override_settings( + INTRACLUSTER_HTTPS=False, + REMOTE_STORE_USE_POST=True, + REMOTE_FIND_TIMEOUT=10, + TAGDB_AUTOCOMPLETE_LIMIT=100) + def test_auto_complete_tags(self, http_request): + finder = RemoteFinder('127.0.0.1') + + data = [ + 'tag1', + 'tag2', + ] + + responseObject = HTTPResponse(body=BytesIO(json.dumps(data).encode('utf-8')), status=200, preload_content=False) + http_request.return_value = responseObject + + result = finder.auto_complete_tags(['name=test'], 'tag') + + self.assertIsInstance(result, list) + + self.assertEqual(http_request.call_args[0], ( + 'POST', + 'http://127.0.0.1/tags/autoComplete/tags', + )) + self.assertEqual(http_request.call_args[1], { + 'fields': [ + ('tagPrefix', 'tag'), + ('limit', '100'), + ('expr', 'name=test'), + ], + 'headers': None, + 'preload_content': False, + 'timeout': 10, + }) + + self.assertEqual(len(result), 2) + + self.assertEqual(result[0], 'tag1') + self.assertEqual(result[1], 'tag2') + + # explicit limit & forward headers + responseObject = HTTPResponse(body=BytesIO(json.dumps(data).encode('utf-8')), status=200, preload_content=False) + http_request.return_value = responseObject + + result = finder.auto_complete_tags(['name=test', 'tag3=value3'], 'tag', limit=5, requestContext={'forwardHeaders': {'X-Test': 'test'}}) + + self.assertIsInstance(result, list) + + self.assertEqual(http_request.call_args[0], ( + 'POST', + 'http://127.0.0.1/tags/autoComplete/tags', + )) + self.assertEqual(http_request.call_args[1], { + 'fields': [ + ('tagPrefix', 'tag'), + ('limit', '5'), + ('expr', 'name=test'), + ('expr', 'tag3=value3'), + ], + 'headers': {'X-Test': 'test'}, + 'preload_content': False, + 'timeout': 10, + }) + + self.assertEqual(len(result), 2) + + self.assertEqual(result[0], 'tag1') + self.assertEqual(result[1], 'tag2') + + # non-json response + responseObject = HTTPResponse(body=BytesIO(b'error'), status=200, preload_content=False) + http_request.return_value = responseObject + + with self.assertRaisesRegexp(Exception, 'Error decoding autocomplete tags response from http://[^ ]+: .+'): + result = finder.auto_complete_tags(['name=test'], 'tag') + + @patch('urllib3.PoolManager.request') + @override_settings( + INTRACLUSTER_HTTPS=False, + REMOTE_STORE_USE_POST=True, + REMOTE_FIND_TIMEOUT=10, + TAGDB_AUTOCOMPLETE_LIMIT=100) + def test_auto_complete_values(self, http_request): + finder = RemoteFinder('127.0.0.1') + + data = [ + 'value1', + 'value2', + ] + + responseObject = HTTPResponse(body=BytesIO(json.dumps(data).encode('utf-8')), status=200, preload_content=False) + http_request.return_value = responseObject + + result = finder.auto_complete_values(['name=test'], 'tag1', 'value') + + self.assertIsInstance(result, list) + + self.assertEqual(http_request.call_args[0], ( + 'POST', + 'http://127.0.0.1/tags/autoComplete/values', + )) + self.assertEqual(http_request.call_args[1], { + 'fields': [ + ('tag', 'tag1'), + ('valuePrefix', 'value'), + ('limit', '100'), + ('expr', 'name=test'), + ], + 'headers': None, + 'preload_content': False, + 'timeout': 10, + }) + + self.assertEqual(len(result), 2) + + self.assertEqual(result[0], 'value1') + self.assertEqual(result[1], 'value2') + + # explicit limit & forward headers + responseObject = HTTPResponse(body=BytesIO(json.dumps(data).encode('utf-8')), status=200, preload_content=False) + http_request.return_value = responseObject + + result = finder.auto_complete_values(['name=test', 'tag3=value3'], 'tag1', 'value', limit=5, requestContext={'forwardHeaders': {'X-Test': 'test'}}) + + self.assertIsInstance(result, list) + + self.assertEqual(http_request.call_args[0], ( + 'POST', + 'http://127.0.0.1/tags/autoComplete/values', + )) + self.assertEqual(http_request.call_args[1], { + 'fields': [ + ('tag', 'tag1'), + ('valuePrefix', 'value'), + ('limit', '5'), + ('expr', 'name=test'), + ('expr', 'tag3=value3'), + ], + 'headers': {'X-Test': 'test'}, + 'preload_content': False, + 'timeout': 10, + }) + + self.assertEqual(len(result), 2) + + self.assertEqual(result[0], 'value1') + self.assertEqual(result[1], 'value2') + + # non-json response + responseObject = HTTPResponse(body=BytesIO(b'error'), status=200, preload_content=False) + http_request.return_value = responseObject + + with self.assertRaisesRegexp(Exception, 'Error decoding autocomplete values response from http://[^ ]+: .+'): + result = finder.auto_complete_values(['name=test'], 'tag1', 'value') diff --git a/webapp/tests/test_functions.py b/webapp/tests/test_functions.py index 1d04d0cd1..8c0c9fbee 100644 --- a/webapp/tests/test_functions.py +++ b/webapp/tests/test_functions.py @@ -871,7 +871,7 @@ def test_delay(self): gotList = functions.delay({}, source, delay) self.assertEqual(len(gotList), len(expectedList)) for got, expected in zip(gotList, expectedList): - self.assertListEqual(got, expected) + self.assertEqual(got, expected) def test_asPercent_error(self): seriesList = self._gen_series_list_with_data( @@ -4195,7 +4195,7 @@ def test_movingWindow_invalidFunc(self): data=list(range(20, 25)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4215,7 +4215,7 @@ def test_movingWindow_xFilesFactor(self): data=list(range(0, 10)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4249,7 +4249,7 @@ def test_movingMedian_evaluateTarget_returns_none(self): data=list(range(10, 25)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4279,7 +4279,7 @@ def test_movingMedian_evaluateTarget_returns_half_none(self): data=list(range(10, 110)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4309,7 +4309,7 @@ def test_movingMedian_evaluateTarget_returns_empty_list(self): data=list(range(10, 110)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return [] expectedResults = [] @@ -4332,7 +4332,7 @@ def test_movingMedian_integerWindowSize(self): data=list(range(10, 110)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=600, @@ -4362,7 +4362,7 @@ def test_movingMedian_stringWindowSize(self): data=list(range(10, 610)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=600, @@ -4395,7 +4395,7 @@ def test_movingAverage_evaluateTarget_returns_none(self): data=list(range(0, 25)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4425,7 +4425,7 @@ def test_movingAverage_evaluateTarget_returns_half_none(self): data=list(range(0, 10)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4456,7 +4456,7 @@ def test_movingAverage_evaluateTarget_returns_empty_list(self): data=list(range(10, 110)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return [] expectedResults = [] @@ -4479,7 +4479,7 @@ def test_movingAverage_integerWindowSize(self): data=list(range(10, 110)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=600, @@ -4513,7 +4513,7 @@ def test_movingAverage_stringWindowSize(self): data=list(range(10, 110)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=600, @@ -4553,7 +4553,7 @@ def test_movingMin_evaluateTarget_returns_none(self): data=list(range(start, end)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): seriesList = [ TimeSeries('collectd.test-db0.load.value', 10, 25, 1, [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]) ] @@ -4583,7 +4583,7 @@ def test_movingMin_evaluateTarget_returns_half_none(self): data=[2, 1] * 5 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): seriesList = [ TimeSeries('collectd.test-db0.load.value', 10, 30, 1, [None] * 10 + [2, 1] * 5) ] @@ -4613,7 +4613,7 @@ def test_movingMin_evaluateTarget_returns_empty_list(self): data=list(range(10, 10 + 100)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return [] expectedResults = [] @@ -4636,7 +4636,7 @@ def test_movingMin_integerWindowSize(self): data=[10, 1] * 5 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4666,7 +4666,7 @@ def test_movingMin_stringWindowSize(self): data=[10, 1] * 50 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=600, @@ -4701,7 +4701,7 @@ def test_movingMax_evaluateTarget_returns_none(self): data=list(range(start, end)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): seriesList = [ TimeSeries('collectd.test-db0.load.value', 10, 25, 1, [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]) ] @@ -4731,7 +4731,7 @@ def test_movingMax_evaluateTarget_returns_half_none(self): data=[1, 2] * 5 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): seriesList = [ TimeSeries('collectd.test-db0.load.value', 10, 30, 1, [None] * 10 + [1, 2] * 5) ] @@ -4761,7 +4761,7 @@ def test_movingMax_evaluateTarget_returns_empty_list(self): data=list(range(10, 10 + 100)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return [] expectedResults = [] @@ -4784,7 +4784,7 @@ def test_movingMax_integerWindowSize(self): data=[1, 2] * 5 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=10, @@ -4814,7 +4814,7 @@ def test_movingMax_stringWindowSize(self): data=[1, 10] * 50 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data( key='collectd.test-db0.load.value', start=600, @@ -4862,7 +4862,7 @@ def test_movingSum_evaluateTarget_returns_none(self): data=list(range(start, end)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): seriesList = [ TimeSeries('collectd.test-db0.load.value', 10, 25, 1, [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]) ] @@ -4892,7 +4892,7 @@ def test_movingSum_evaluateTarget_returns_half_none(self): data=list(range(0, 10)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): seriesList = [ TimeSeries('collectd.test-db0.load.value', 10, 30, 1, [None] * 10 + list(range(0, 10))) ] @@ -4922,7 +4922,7 @@ def test_movingSum_evaluateTarget_returns_empty_list(self): data=list(range(10, 10 + 100)) ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return [] expectedResults = [] @@ -4945,7 +4945,7 @@ def test_movingSum_integerWindowSize(self): data=[1] * 100 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data(key='collectd.test-db0.load.value', start=600, end=700, step=1, data=[1] * 100) expectedResults = [ @@ -4970,7 +4970,7 @@ def test_movingSum_stringWindowSize(self): data=[1] * 100 ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data(key='collectd.test-db0.load.value', start=600, end=700, step=1, data=[1] * 100) expectedResults = [ @@ -4998,7 +4998,7 @@ def gen_seriesList(start=0): seriesList = gen_seriesList(10) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return gen_seriesList() expectedResults = [ @@ -5036,7 +5036,7 @@ def gen_seriesList(start=0, points=10): seriesList = gen_seriesList(start_time, points) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return gen_seriesList(start_time-week_seconds, (week_seconds/step)+points) expectedResults = [ @@ -5075,7 +5075,7 @@ def gen_seriesList(start=0, points=10): seriesList = gen_seriesList(start_time, points) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return gen_seriesList(start_time-week_seconds, (week_seconds/step)+points) expectedResults = [ @@ -5116,7 +5116,7 @@ def gen_seriesList(start=0, points=10): seriesList = gen_seriesList(start_time, points) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return gen_seriesList(start_time-week_seconds, (week_seconds/step)+points) expectedResults = [ @@ -5683,7 +5683,7 @@ def test_exponentialMovingAverage_integerWindowSize(self): data=[14.5, 15.5, 16.5, 17.5, 18.5, 19.5, 20.5, 21.5, 22.5, 23.5, 24.5, 25.5, 26.5, 27.5, 28.5, 29.5, 30.5, 31.5, 32.5, 33.5, 34.5, 35.5, 36.5, 37.5, 38.5, 39.5, 40.5, 41.5, 42.5, 43.5, 44.5] ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return seriesList with patch('graphite.render.functions.evaluateTarget', mock_evaluateTarget): @@ -5701,7 +5701,7 @@ def test_exponentialMovingAverage_stringWindowSize(self): data=[14.5, 15.5, 16.5, 17.5, 18.5, 19.5, 20.5, 21.5, 22.5, 23.5, 24.5, 25.5, 26.5, 27.5, 28.5, 29.5, 30.5, 31.5, 32.5, 33.5, 34.5, 35.5, 36.5, 37.5, 38.5, 39.5, 40.5, 41.5, 42.5, 43.5, 44.5] ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return seriesList with patch('graphite.render.functions.evaluateTarget', mock_evaluateTarget): @@ -5714,7 +5714,7 @@ def test_exponentialMovingAverage_evaluateTarget_returns_empty_list(self): seriesList = self._gen_series_list_with_data(start=600, end=700, data=list(range(0, 100))) expectedResults = [] - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return [] with patch('graphite.render.functions.evaluateTarget', mock_evaluateTarget): @@ -5732,7 +5732,7 @@ def test_exponentialMovingAverage_evaluateTarget_returns_half_none(self): data=[0, 0.0, 0.181818, 0.512397, 0.964688, 1.516563, None, 2.149915, 2.849931, 3.604489, 4.403673] ) - def mock_evaluateTarget(requestContext, targets, noPrefetch=False): + def mock_evaluateTarget(requestContext, targets): return self._gen_series_list_with_data(key='collectd.test-db0.load.value',start=10, end=30, data=([None] * 10 + list(range(0, 5)) + [None] + list(range(5, 9)))) with patch('graphite.render.functions.evaluateTarget', mock_evaluateTarget): @@ -5966,50 +5966,6 @@ def mock_data_fetcher(reqCtx, path_expression): self.assertEqual(result, expectedResult) - @patch('graphite.render.evaluator.prefetchData', lambda *_: None) - def test_seriesByTag(self): - class MockTagDB(object): - def find_series(self, tagExpressions, requestContext=None): - if tagExpressions == ('name=disk.bytes_used', 'server=server1'): - return ['disk.bytes_used;server=server1'] - - if tagExpressions == ('name=disk.bytes_used', 'server=server2'): - return [] - - raise Exception('Unexpected find_series call with tagExpressions: %s' % str(tagExpressions)) - - - def mock_data_fetcher(reqCtx, path_expression): - if path_expression != 'disk.bytes_used;server=server1': - raise Exception('Unexpected fetchData call with pathExpression: %s' % path_expression) - - return self._gen_series_list_with_data( - key=['disk.bytes_used;server=server1'], - start=0, - end=3, - data=[[10, 20, 30]] - ) - - request_context = self._build_requestContext(0, 3) - - with patch('graphite.render.evaluator.fetchData', mock_data_fetcher): - with patch('graphite.storage.STORE.tagdb', None): - query = 'seriesByTag("name=disk.bytes_used","server=server1")' - result = evaluateTarget(request_context, query) - self.assertEqual(result, []) - - with patch('graphite.storage.STORE.tagdb', MockTagDB()): - query = 'seriesByTag("name=disk.bytes_used","server=server1")' - result = evaluateTarget(request_context, query) - self.assertEqual(result, [ - TimeSeries('disk.bytes_used;server=server1',0,3,1,[10, 20, 30]), - ]) - self.assertEqual(result[0].pathExpression, query) - - query = 'seriesByTag("name=disk.bytes_used","server=server2")' - result = evaluateTarget(request_context, query) - self.assertEqual(result, []) - def test_groupByTags(self): class MockTagDB(object): @staticmethod diff --git a/webapp/tests/test_readers_remote.py b/webapp/tests/test_readers_remote.py index de25894aa..3e9bc2d6b 100644 --- a/webapp/tests/test_readers_remote.py +++ b/webapp/tests/test_readers_remote.py @@ -6,7 +6,7 @@ from graphite.finders.remote import RemoteFinder from graphite.readers.remote import RemoteReader -from graphite.util import pickle, StringIO, msgpack +from graphite.util import pickle, BytesIO, msgpack from graphite.wsgi import application # NOQA makes sure we have a working WSGI app @@ -64,7 +64,7 @@ def test_RemoteReader_fetch_multi(self, http_request): 'name': 'a.b.c.d' } ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject result = reader.fetch_multi(startTime, endTime) @@ -108,7 +108,7 @@ def test_RemoteReader_fetch_multi(self, http_request): } ] responseObject = HTTPResponse( - body=StringIO(msgpack.dumps(data)), + body=BytesIO(msgpack.dumps(data)), status=200, preload_content=False, headers={'Content-Type': 'application/x-msgpack'} @@ -145,14 +145,14 @@ def test_RemoteReader_fetch_multi(self, http_request): }) # non-pickle response - responseObject = HTTPResponse(body=StringIO(b'error'), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(b'error'), status=200, preload_content=False) http_request.return_value = responseObject with self.assertRaisesRegexp(Exception, 'Error decoding render response from http://[^ ]+: .+'): reader.fetch(startTime, endTime) # non-200 response - responseObject = HTTPResponse(body=StringIO(b'error'), status=500, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(b'error'), status=500, preload_content=False) http_request.return_value = responseObject with self.assertRaisesRegexp(Exception, 'Error response 500 from http://[^ ]+'): @@ -203,7 +203,7 @@ def test_RemoteReader_fetch(self, http_request): 'name': 'a.b.c.d' } ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) + responseObject = HTTPResponse(body=BytesIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject result = reader.fetch(startTime, endTime) expected_response = ((1496262000, 1496262060, 60), [1.0, 0.0, 1.0, 0.0, 1.0]) diff --git a/webapp/tests/test_render.py b/webapp/tests/test_render.py index 9f6e7d6fc..1c3fe78cf 100644 --- a/webapp/tests/test_render.py +++ b/webapp/tests/test_render.py @@ -205,6 +205,21 @@ def test_render_view(self): self.assertEqual(response['Content-Type'], 'image/png') self.assertTrue(response.has_header('Expires')) + # png format returns image/png + response = self.client.get(url, {'target': 'test', 'format': 'png'}) + self.assertEqual(response['Content-Type'], 'image/png') + self.assertTrue(response.has_header('Expires')) + + # svg format returns image/svg+xml + response = self.client.get(url, {'target': 'test', 'format': 'svg'}) + self.assertEqual(response['Content-Type'], 'image/svg+xml') + self.assertTrue(response.has_header('Expires')) + + # pdf format returns application/x-pdf + response = self.client.get(url, {'target': 'test', 'format': 'pdf'}) + self.assertEqual(response['Content-Type'], 'application/x-pdf') + self.assertTrue(response.has_header('Expires')) + # Verify graphType=pie returns response = self.client.get(url, {'target': 'a:50', 'graphType': 'pie'}) self.assertEqual(response['Content-Type'], 'image/png') diff --git a/webapp/tests/test_storage.py b/webapp/tests/test_storage.py index 7e05ac216..d0daf3959 100644 --- a/webapp/tests/test_storage.py +++ b/webapp/tests/test_storage.py @@ -2,7 +2,7 @@ import time from django.test import override_settings -from mock import patch +from mock import patch, Mock from .base import TestCase @@ -10,10 +10,12 @@ from graphite.intervals import Interval, IntervalSet from graphite.node import LeafNode, BranchNode from graphite.readers.utils import BaseReader -from graphite.storage import Store, extractForwardHeaders, get_finders +from graphite.storage import Store, extractForwardHeaders, get_finders, get_tagdb from graphite.tags.localdatabase import LocalDatabaseTagDB from graphite.worker_pool.pool import PoolTimeoutError - +from graphite.render.datalib import TimeSeries +from graphite.render.evaluator import evaluateTarget +from graphite.util import epoch_to_dt class StorageTest(TestCase): @@ -25,7 +27,7 @@ def test_fetch(self): store = Store( finders=[disabled_finder, legacy_finder, test_finder, remote_finder], - tagdb='graphite.tags.localdatabase.LocalDatabaseTagDB' + tagdb=get_tagdb('graphite.tags.localdatabase.LocalDatabaseTagDB') ) # tagb is properly initialized @@ -64,11 +66,11 @@ def mock_pool_exec(pool, jobs, timeout): raise PoolTimeoutError() with patch('graphite.storage.pool_exec', mock_pool_exec): - with patch('graphite.storage.log.debug') as log_debug: + with patch('graphite.storage.log.info') as log_info: with self.assertRaisesRegexp(Exception, 'All fetches failed for \[\'a\'\]'): list(store.fetch(['a'], 1, 2, 3, {})) - self.assertEqual(log_debug.call_count, 1) - self.assertRegexpMatches(log_debug.call_args[0][0], 'Timed out in fetch after [-.e0-9]+s') + self.assertEqual(log_info.call_count, 1) + self.assertRegexpMatches(log_info.call_args[0][0], 'Timed out in fetch after [-.e0-9]+s') def test_fetch_all_failed(self): # all finds failed @@ -76,11 +78,21 @@ def test_fetch_all_failed(self): finders=[TestFinder()] ) - with patch('graphite.storage.log.debug') as log_debug: + with patch('graphite.storage.log.info') as log_info: + with self.assertRaisesRegexp(Exception, 'Fetch for \[\'a\'\] failed: TestFinder.find_nodes'): + list(store.fetch(['a'], 1, 2, 3, {})) + self.assertEqual(log_info.call_count, 1) + self.assertRegexpMatches(log_info.call_args[0][0], 'Fetch for \[\'a\'\] failed after [-.e0-9]+s: TestFinder.find_nodes') + + store = Store( + finders=[TestFinder(), TestFinder()] + ) + + with patch('graphite.storage.log.info') as log_info: with self.assertRaisesRegexp(Exception, 'All fetches failed for \[\'a\'\]'): list(store.fetch(['a'], 1, 2, 3, {})) - self.assertEqual(log_debug.call_count, 1) - self.assertRegexpMatches(log_debug.call_args[0][0], 'Fetch for \[\'a\'\] failed after [-.e0-9]+s: TestFinder.find_nodes') + self.assertEqual(log_info.call_count, 2) + self.assertRegexpMatches(log_info.call_args[0][0], 'Fetch for \[\'a\'\] failed after [-.e0-9]+s: TestFinder.find_nodes') def test_find(self): disabled_finder = DisabledFinder() @@ -90,7 +102,7 @@ def test_find(self): store = Store( finders=[disabled_finder, legacy_finder, test_finder, remote_finder], - tagdb='graphite.tags.localdatabase.LocalDatabaseTagDB' + tagdb=get_tagdb('graphite.tags.localdatabase.LocalDatabaseTagDB') ) # find nodes @@ -137,11 +149,11 @@ def mock_pool_exec(pool, jobs, timeout): raise PoolTimeoutError() with patch('graphite.storage.pool_exec', mock_pool_exec): - with patch('graphite.storage.log.debug') as log_debug: + with patch('graphite.storage.log.info') as log_info: with self.assertRaisesRegexp(Exception, 'All finds failed for '): list(store.find('a')) - self.assertEqual(log_debug.call_count, 1) - self.assertRegexpMatches(log_debug.call_args[0][0], 'Timed out in find after [-.e0-9]+s') + self.assertEqual(log_info.call_count, 1) + self.assertRegexpMatches(log_info.call_args[0][0], 'Timed out in find after [-.e0-9]+s') def test_find_all_failed(self): # all finds failed @@ -149,11 +161,21 @@ def test_find_all_failed(self): finders=[TestFinder()] ) - with patch('graphite.storage.log.debug') as log_debug: + with patch('graphite.storage.log.info') as log_info: + with self.assertRaisesRegexp(Exception, 'Find for failed: TestFinder.find_nodes'): + list(store.find('a')) + self.assertEqual(log_info.call_count, 1) + self.assertRegexpMatches(log_info.call_args[0][0], 'Find for failed after [-.e0-9]+s: TestFinder.find_nodes') + + store = Store( + finders=[TestFinder(), TestFinder()] + ) + + with patch('graphite.storage.log.info') as log_info: with self.assertRaisesRegexp(Exception, 'All finds failed for '): list(store.find('a')) - self.assertEqual(log_debug.call_count, 1) - self.assertRegexpMatches(log_debug.call_args[0][0], 'Find for failed after [-.e0-9]+s: TestFinder.find_nodes') + self.assertEqual(log_info.call_count, 2) + self.assertRegexpMatches(log_info.call_args[0][0], 'Find for failed after [-.e0-9]+s: TestFinder.find_nodes') @override_settings(REMOTE_STORE_FORWARD_HEADERS=['X-Test1', 'X-Test2']) def test_extractForwardHeaders(self): @@ -174,7 +196,7 @@ def test_get_index(self): store = Store( finders=[disabled_finder, legacy_finder, test_finder, remote_finder], - tagdb='graphite.tags.localdatabase.LocalDatabaseTagDB' + tagdb=get_tagdb('graphite.tags.localdatabase.LocalDatabaseTagDB') ) # get index @@ -195,11 +217,11 @@ def mock_pool_exec(pool, jobs, timeout): raise PoolTimeoutError() with patch('graphite.storage.pool_exec', mock_pool_exec): - with patch('graphite.storage.log.debug') as log_debug: + with patch('graphite.storage.log.info') as log_info: with self.assertRaisesRegexp(Exception, 'All index lookups failed'): store.get_index() - self.assertEqual(log_debug.call_count, 1) - self.assertRegexpMatches(log_debug.call_args[0][0], 'Timed out in get_index after [-.e0-9]+s') + self.assertEqual(log_info.call_count, 1) + self.assertRegexpMatches(log_info.call_args[0][0], 'Timed out in get_index after [-.e0-9]+s') def test_get_index_all_failed(self): # all finders failed @@ -207,11 +229,294 @@ def test_get_index_all_failed(self): finders=[TestFinder()] ) - with patch('graphite.storage.log.debug') as log_debug: + with patch('graphite.storage.log.info') as log_info: + with self.assertRaisesRegexp(Exception, 'get_index failed: TestFinder.find_nodes'): + store.get_index() + self.assertEqual(log_info.call_count, 1) + self.assertRegexpMatches(log_info.call_args[0][0], 'get_index failed after [-.e0-9]+s: TestFinder.find_nodes') + + store = Store( + finders=[TestFinder(), TestFinder()] + ) + + with patch('graphite.storage.log.info') as log_info: with self.assertRaisesRegexp(Exception, 'All index lookups failed'): store.get_index() - self.assertEqual(log_debug.call_count, 1) - self.assertRegexpMatches(log_debug.call_args[0][0], 'Find for failed after [-.e0-9]+s: TestFinder.find_nodes') + self.assertEqual(log_info.call_count, 2) + self.assertRegexpMatches(log_info.call_args[0][0], 'get_index failed after [-.e0-9]+s: TestFinder.find_nodes') + + @override_settings(USE_WORKER_POOL=False) + def test_fetch_tag_support(self): + class TestFinderTags(BaseFinder): + tags = True + + def find_nodes(self, query): + pass + + def fetch(self, patterns, start_time, end_time, now=None, requestContext=None): + if patterns != ['seriesByTag("hello=tiger")', 'seriesByTag("name=notags")', 'seriesByTag("name=testtags")', 'testtags;hello=tiger']: + raise Exception('Unexpected patterns %s' % str(patterns)) + + return [ + { + 'pathExpression': 'testtags;hello=tiger', + 'name': 'testtags;hello=tiger', + 'time_info': (0, 60, 1), + 'values': [], + }, + { + 'pathExpression': 'seriesByTag("hello=tiger")', + 'name': 'testtags;hello=tiger', + 'time_info': (0, 60, 1), + 'values': [], + }, + { + 'pathExpression': 'seriesByTag("name=testtags")', + 'name': 'testtags;hello=tiger', + 'time_info': (0, 60, 1), + 'values': [], + }, + ] + + tagdb = Mock() + + store = Store( + finders=[TestFinderTags()], + tagdb=tagdb + ) + + request_context = { + 'startTime': epoch_to_dt(0), + 'endTime': epoch_to_dt(60), + 'now': epoch_to_dt(60), + } + + with patch('graphite.render.datalib.STORE', store): + results = evaluateTarget(request_context, ['testtags;hello=tiger', 'seriesByTag("hello=tiger")', 'seriesByTag("name=testtags")', 'seriesByTag("name=notags")']) + self.assertEqual(results, [ + TimeSeries('testtags;hello=tiger', 0, 60, 1, []), + TimeSeries('testtags;hello=tiger', 0, 60, 1, [], pathExpression='seriesByTag("hello=tiger")'), + TimeSeries('testtags;hello=tiger', 0, 60, 1, [], pathExpression='seriesByTag("name=testtags")'), + ]) + + @override_settings(USE_WORKER_POOL=True) + def test_fetch_no_tag_support(self): + class TestFinderNoTags(BaseFinder): + tags = False + + def find_nodes(self, query): + pass + + def fetch(self, patterns, start_time, end_time, now=None, requestContext=None): + if patterns != ['notags;hello=tiger']: + raise Exception('Unexpected patterns %s' % str(patterns)) + + return [ + { + 'pathExpression': 'notags;hello=tiger', + 'name': 'notags;hello=tiger', + 'time_info': (0, 60, 1), + 'values': [], + } + ] + + tagdb = Mock() + + def mockFindSeries(exprs, requestContext=None): + self.assertEqual(requestContext, request_context) + if exprs == ('hello=tiger',) or exprs == ('name=notags',): + return ['notags;hello=tiger'] + if exprs == ('name=testtags',): + return [] + raise Exception('Unexpected exprs %s' % str(exprs)) + + tagdb.find_series.side_effect = mockFindSeries + + store = Store( + finders=[TestFinderNoTags()], + tagdb=tagdb + ) + + with patch('graphite.render.datalib.STORE', store): + request_context = { + 'startTime': epoch_to_dt(0), + 'endTime': epoch_to_dt(60), + 'now': epoch_to_dt(60), + } + + results = evaluateTarget(request_context, ['notags;hello=tiger', 'seriesByTag("hello=tiger")', 'seriesByTag("name=testtags")', 'seriesByTag("name=notags")']) + self.assertEqual(tagdb.find_series.call_count, 3) + self.assertEqual(results, [ + TimeSeries('notags;hello=tiger', 0, 60, 1, []), + TimeSeries('notags;hello=tiger', 0, 60, 1, [], pathExpression='seriesByTag("hello=tiger")'), + TimeSeries('notags;hello=tiger', 0, 60, 1, [], pathExpression='seriesByTag("name=notags")'), + ]) + + def test_autocomplete(self): + test = self + + class TestFinderTags(BaseFinder): + tags = True + + def __init__(self, request_limit=100, request_context=None): + self.limit = request_limit + self.context = request_context or {} + + def find_nodes(self, query): + pass + + def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None): + test.assertEqual(exprs, ['tag1=value1']) + test.assertEqual(tagPrefix, 'test') + test.assertEqual(limit, self.limit) + test.assertEqual(requestContext, self.context) + return ['testtags'] + + def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None): + test.assertEqual(exprs, ['tag1=value1']) + test.assertEqual(tag, 'tag2') + test.assertEqual(valuePrefix, 'test') + test.assertEqual(limit, self.limit) + test.assertEqual(requestContext, self.context) + return ['testtags'] + + + class TestFinderNoTags(BaseFinder): + tags = False + + def find_nodes(self, query): + pass + + + class TestFinderTagsException(TestFinderTags): + def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None): + raise Exception('TestFinderTagsException.auto_complete_tags') + + def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None): + raise Exception('TestFinderTagsException.auto_complete_values') + + + class TestFinderTagsTimeout(TestFinderTags): + def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None): + time.sleep(0.1) + return ['testtags'] + + def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None): + time.sleep(0.1) + return ['testtags'] + + + def mockStore(finders, request_limit=100, request_context=None): + tagdb = Mock() + + def mockAutoCompleteTags(exprs, tagPrefix=None, limit=None, requestContext=None): + self.assertEqual(exprs, ['tag1=value1']) + self.assertEqual(tagPrefix, 'test') + self.assertEqual(limit, request_limit) + self.assertEqual(requestContext, request_context or {}) + return ['testnotags'] + + tagdb.auto_complete_tags.side_effect = mockAutoCompleteTags + + def mockAutoCompleteValues(exprs, tag, valuePrefix=None, limit=None, requestContext=None): + self.assertEqual(exprs, ['tag1=value1']) + self.assertEqual(tag, 'tag2') + self.assertEqual(valuePrefix, 'test') + self.assertEqual(limit, request_limit) + self.assertEqual(requestContext, request_context or {}) + return ['testnotags'] + + tagdb.auto_complete_values.side_effect = mockAutoCompleteValues + + return Store( + finders=finders, + tagdb=tagdb, + ) + + request_context = {} + + # test with both tag-enabled and non-tag-enabled finders + store = mockStore([TestFinderTags(), TestFinderNoTags()]) + + result = store.tagdb_auto_complete_tags(['tag1=value1'], 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_tags.call_count, 1) + self.assertEqual(result, ['testnotags', 'testtags']) + + result = store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_values.call_count, 1) + self.assertEqual(result, ['testnotags', 'testtags']) + + # test with no limit & no requestContext + store = mockStore([TestFinderTags(None, {}), TestFinderNoTags()], None, {}) + + result = store.tagdb_auto_complete_tags(['tag1=value1'], 'test') + self.assertEqual(store.tagdb.auto_complete_tags.call_count, 1) + self.assertEqual(result, ['testnotags', 'testtags']) + + result = store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test') + self.assertEqual(store.tagdb.auto_complete_values.call_count, 1) + self.assertEqual(result, ['testnotags', 'testtags']) + + # test with only tag-enabled finder + store = mockStore([TestFinderTags()]) + + result = store.tagdb_auto_complete_tags(['tag1=value1'], 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_tags.call_count, 0) + self.assertEqual(result, ['testtags']) + + result = store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_values.call_count, 0) + self.assertEqual(result, ['testtags']) + + # test with only non-tag-enabled finder + store = mockStore([TestFinderNoTags()]) + + result = store.tagdb_auto_complete_tags(['tag1=value1'], 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_tags.call_count, 1) + self.assertEqual(result, ['testnotags']) + + result = store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_values.call_count, 1) + self.assertEqual(result, ['testnotags']) + + # test with no finders + store = mockStore([]) + + result = store.tagdb_auto_complete_tags(['tag1=value1'], 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_tags.call_count, 0) + self.assertEqual(result, []) + + result = store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test', 100, request_context) + self.assertEqual(store.tagdb.auto_complete_values.call_count, 0) + self.assertEqual(result, []) + + # test exception handling with one finder + store = mockStore([TestFinderTagsException()]) + + with self.assertRaisesRegexp(Exception, 'Autocomplete tags for \[\'tag1=value1\'\] test failed: TestFinderTagsException\.auto_complete_tags'): + store.tagdb_auto_complete_tags(['tag1=value1'], 'test', 100, request_context) + + with self.assertRaisesRegexp(Exception, 'Autocomplete values for \[\'tag1=value1\'\] tag2 test failed: TestFinderTagsException\.auto_complete_values'): + store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test', 100, request_context) + + # test exception handling with more than one finder + store = mockStore([TestFinderTagsException(), TestFinderTagsException()]) + + with self.assertRaisesRegexp(Exception, 'All autocomplete tag requests failed for \[\'tag1=value1\'\] test'): + store.tagdb_auto_complete_tags(['tag1=value1'], 'test', 100, request_context) + + with self.assertRaisesRegexp(Exception, 'All autocomplete value requests failed for \[\'tag1=value1\'\] tag2 test'): + store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test', 100, request_context) + + # test pool timeout handling + store = mockStore([TestFinderTagsTimeout()]) + + with self.settings(USE_WORKER_POOL=True, REMOTE_FIND_TIMEOUT=0): + with self.assertRaisesRegexp(Exception, 'Timed out in autocomplete tags for \[\'tag1=value1\'\] test after [-.e0-9]+s'): + store.tagdb_auto_complete_tags(['tag1=value1'], 'test', 100, request_context) + + with self.assertRaisesRegexp(Exception, 'Timed out in autocomplete values for \[\'tag1=value1\'\] tag2 test after [-.e0-9]+s'): + store.tagdb_auto_complete_values(['tag1=value1'], 'tag2', 'test', 100, request_context) class DisabledFinder(object):