Skip to content

Commit

Permalink
Merge pull request #2258 from deniszh/backport/1.1.x/pr-2221_pr-2221_…
Browse files Browse the repository at this point in the history
…pr-2234_pr-2234_pr-2244_pr-2252_pr-2254_pr-2254_pr-2257

[1.1.x] support for storing tagged series in hashed filenames | lint fix | add optional keepStep parameter to aggregateLine | don't modify series | w/g/f/remote.py: add missing local field to auto_complete_{t
  • Loading branch information
deniszh authored Mar 16, 2018
2 parents 0c3802a + 9ccd2a0 commit 94d4200
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 182 deletions.
32 changes: 16 additions & 16 deletions webapp/graphite/finders/ceres.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ def __init__(self, directory=None):
self.tree = CeresTree(directory)

def find_nodes(self, query):

# translate query pattern if it is tagged
tagged = not query.pattern.startswith('_tagged.') and ';' in query.pattern
if tagged:
# tagged series are stored in ceres using encoded names, so to retrieve them we need to encode the
# query pattern using the same scheme used in carbon when they are written.
variants = [TaggedSeries.encode(query.pattern)]
# tagged series are stored in ceres using encoded names, so to retrieve them we need to
# encode the query pattern using the same scheme used in carbon when they are written.
variants = [
TaggedSeries.encode(query.pattern, hash_only=True),
TaggedSeries.encode(query.pattern, hash_only=False),
]
else:
variants = extract_variants(query.pattern)
variants = extract_variants(query.pattern)

for variant in variants:
for fs_path in glob(self.tree.getFilesystemPath(variant)):
Expand All @@ -42,14 +44,12 @@ def find_nodes(self, query):
if CeresNode.isNodeDir(fs_path):
ceres_node = self.tree.getNode(metric_path)

if ceres_node.hasDataForInterval(
query.startTime, query.endTime):
real_metric_path = get_real_metric_path(
fs_path, metric_path)
if ceres_node.hasDataForInterval(query.startTime, query.endTime):
real_metric_path = get_real_metric_path(fs_path, metric_path)
reader = CeresReader(ceres_node, real_metric_path)
# if we're finding by tag, return the proper metric path
if tagged:
metric_path = query.pattern
metric_path = query.pattern
yield LeafNode(metric_path, reader)

elif os.path.isdir(fs_path):
Expand All @@ -59,12 +59,12 @@ def get_index(self, requestContext):
matches = []

for root, _, files in walk(settings.CERES_DIR):
root = root.replace(settings.CERES_DIR, '')
for filename in files:
if filename == '.ceres-node':
matches.append(root)
root = root.replace(settings.CERES_DIR, '')
for filename in files:
if filename == '.ceres-node':
matches.append(root)

return sorted([
m.replace('/', '.').lstrip('.')
for m in matches
m.replace('/', '.').lstrip('.')
for m in matches
])
2 changes: 2 additions & 0 deletions webapp/graphite/finders/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=N
fields = [
('tagPrefix', tagPrefix or ''),
('limit', str(limit)),
('local', self.params.get('local', '1')),
]
for expr in exprs:
fields.append(('expr', expr))
Expand Down Expand Up @@ -223,6 +224,7 @@ def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, request
('tag', tag or ''),
('valuePrefix', valuePrefix or ''),
('limit', str(limit)),
('local', self.params.get('local', '1')),
]
for expr in exprs:
fields.append(('expr', expr))
Expand Down
177 changes: 97 additions & 80 deletions webapp/graphite/finders/standard.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import bisect
import fnmatch
import operator
import os
from os.path import isdir, isfile, join, basename, splitext
from django.conf import settings

Expand All @@ -18,7 +18,7 @@
from graphite.finders.utils import BaseFinder
from graphite.tags.utils import TaggedSeries

from . import fs_to_metric, get_real_metric_path, match_entries
from . import fs_to_metric, get_real_metric_path, match_entries, expand_braces


class StandardFinder(BaseFinder):
Expand All @@ -34,36 +34,52 @@ def find_nodes(self, query):
# translate query pattern if it is tagged
tagged = not query.pattern.startswith('_tagged.') and ';' in query.pattern
if tagged:
# tagged series are stored in whisper using encoded names, so to retrieve them we need to encode the
# query pattern using the same scheme used in carbon when they are written.
clean_pattern = TaggedSeries.encode(query.pattern)
# tagged series are stored in whisper using encoded names, so to retrieve them we need to
# encode the query pattern using the same scheme used in carbon when they are written.
encoded_paths = [
TaggedSeries.encode(query.pattern, sep=os.sep, hash_only=True),
TaggedSeries.encode(query.pattern, sep=os.sep, hash_only=False),
]

pattern_parts = clean_pattern.split('.')

for root_dir in self.directories:
for absolute_path in self._find_paths(root_dir, pattern_parts):
if basename(absolute_path).startswith('.'):
if tagged:
relative_paths = []
for pattern in encoded_paths:
entries = [
pattern + '.wsp',
pattern + '.wsp.gz',
pattern + '.rrd',
]
for entry in entries:
if isfile(join(root_dir, entry)):
relative_paths.append(entry)
else:
relative_paths = self._find_paths(root_dir, pattern_parts)

for relative_path in relative_paths:
if basename(relative_path).startswith('.'):
continue

if self.DATASOURCE_DELIMITER in basename(absolute_path):
(absolute_path, datasource_pattern) = absolute_path.rsplit(
if self.DATASOURCE_DELIMITER in basename(relative_path):
(relative_path, datasource_pattern) = relative_path.rsplit(
self.DATASOURCE_DELIMITER, 1)
else:
datasource_pattern = None

relative_path = absolute_path[len(root_dir):].lstrip('/')
absolute_path = join(root_dir, relative_path)
metric_path = fs_to_metric(relative_path)
real_metric_path = get_real_metric_path(absolute_path, metric_path)

metric_path_parts = metric_path.split('.')
for field_index in find_escaped_pattern_fields(query.pattern):
metric_path_parts[field_index] = pattern_parts[field_index].replace(
'\\', '')
metric_path = '.'.join(metric_path_parts)

# if we're finding by tag, return the proper metric path
if tagged:
metric_path = query.pattern
else:
metric_path_parts = metric_path.split('.')
for field_index in find_escaped_pattern_fields(query.pattern):
metric_path_parts[field_index] = pattern_parts[field_index].replace('\\', '')
metric_path = '.'.join(metric_path_parts)

# Now we construct and yield an appropriate Node object
if isdir(absolute_path):
Expand All @@ -74,8 +90,7 @@ def find_nodes(self, query):
yield LeafNode(metric_path, reader)

elif absolute_path.endswith('.wsp.gz') and GzippedWhisperReader.supported:
reader = GzippedWhisperReader(
absolute_path, real_metric_path)
reader = GzippedWhisperReader(absolute_path, real_metric_path)
yield LeafNode(metric_path, reader)

elif absolute_path.endswith('.rrd') and RRDReader.supported:
Expand All @@ -91,73 +106,75 @@ def find_nodes(self, query):
def _find_paths(self, current_dir, patterns):
"""Recursively generates absolute paths whose components underneath current_dir
match the corresponding pattern in patterns"""
pattern = patterns[0]
raw_pattern = patterns[0]
patterns = patterns[1:]

has_wildcard = pattern.find(
'{') > -1 or pattern.find('[') > -1 or pattern.find('*') > -1 or pattern.find('?') > -1

matching_subdirs = []
files = []
if has_wildcard: # this avoids os.listdir() for performance
subdirs = []
try:
for x in scandir(current_dir):
if x.is_file():
files.append(x.name)
if x.is_dir():
subdirs.append(x.name)
except OSError as e:
log.exception(e)

if pattern == "**":
matching_subdirs = map(operator.itemgetter(0), walk(current_dir))

# if this is a terminal globstar, add a pattern for all files in subdirs
if not patterns:
patterns = ["*"]
else:
matching_subdirs = match_entries(subdirs, pattern)
elif isdir(join(current_dir, pattern)):
matching_subdirs.append(pattern)

# the last pattern may apply to RRD data sources
if len(patterns) == 1 and RRDReader.supported:
if not has_wildcard:
entries = [
pattern + ".rrd",
]
rrd_files = [entry for entry in entries if isfile(join(current_dir, entry))]
else:
rrd_files = match_entries(files, pattern + ".rrd")

if rrd_files: # let's assume it does
datasource_pattern = patterns[0]

for rrd_file in rrd_files:
absolute_path = join(current_dir, rrd_file)
yield absolute_path + self.DATASOURCE_DELIMITER + datasource_pattern
for pattern in expand_braces(raw_pattern):
has_wildcard = pattern.find('[') > -1 or pattern.find('*') > -1 or pattern.find('?') > -1

matching_subdirs = []
files = []
if has_wildcard: # this avoids os.listdir() for performance
subdirs = []
try:
for x in scandir(current_dir):
if x.is_file():
files.append(x.name)
if x.is_dir():
subdirs.append(x.name)
except OSError as e:
log.exception(e)

if pattern == "**":
matching_subdirs = map(
lambda item: item[0][len(current_dir) + 1:],
walk(current_dir)
)

# if this is a terminal globstar, add a pattern for all files in subdirs
if not patterns:
patterns = ["*"]
else:
matching_subdirs = match_entries(subdirs, pattern)
elif isdir(join(current_dir, pattern)):
matching_subdirs.append(pattern)

# the last pattern may apply to RRD data sources
if len(patterns) == 1 and RRDReader.supported:
if not has_wildcard:
entries = [
pattern + ".rrd",
]
rrd_files = [entry for entry in entries if isfile(join(current_dir, entry))]
else:
rrd_files = match_entries(files, pattern + ".rrd")

if rrd_files: # let's assume it does
datasource_pattern = patterns[0]

for rrd_file in rrd_files:
yield rrd_file + self.DATASOURCE_DELIMITER + datasource_pattern

if patterns: # we've still got more directories to traverse
for subdir in matching_subdirs:
absolute_path = join(current_dir, subdir)
for match in self._find_paths(absolute_path, patterns):
yield join(subdir, match)

else: # we've got the last pattern
if not has_wildcard:
entries = [
pattern + '.wsp',
pattern + '.wsp.gz',
pattern + '.rrd',
]
matching_files = [entry for entry in entries if isfile(join(current_dir, entry))]
else:
matching_files = match_entries(files, pattern + '.*')

if patterns: # we've still got more directories to traverse
for subdir in matching_subdirs:
absolute_path = join(current_dir, subdir)
for match in self._find_paths(absolute_path, patterns):
for match in matching_files + matching_subdirs:
yield match

else: # we've got the last pattern
if not has_wildcard:
entries = [
pattern + '.wsp',
pattern + '.wsp.gz',
pattern + '.rrd',
]
matching_files = [entry for entry in entries if isfile(join(current_dir, entry))]
else:
matching_files = match_entries(files, pattern + '.*')

for base_name in matching_files + matching_subdirs:
yield join(current_dir, base_name)

def get_index(self, requestContext):
matches = []

Expand Down
5 changes: 5 additions & 0 deletions webapp/graphite/render/datalib.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ def copy(self, name=None, start=None, end=None, step=None, values=None, consolid
)


def datapoints(self):
timestamps = range(int(self.start), int(self.end) + 1, int(self.step * self.valuesPerPoint))
return list(zip(self, timestamps))


# Data retrieval API
@logtime
def fetchData(requestContext, pathExpr, timer=None):
Expand Down
20 changes: 14 additions & 6 deletions webapp/graphite/render/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4078,11 +4078,14 @@ def constantLine(requestContext, value):
Param('value', ParamTypes.float, required=True),
]

def aggregateLine(requestContext, seriesList, func='average'):
def aggregateLine(requestContext, seriesList, func='average', keepStep=False):
"""
Takes a metric or wildcard seriesList and draws a horizontal line
based on the function applied to each series.
If the optional keepStep parameter is set to True, the result will
have the same time period and step as the source series.
Note: By default, the graphite renderer consolidates data points by
averaging data points over time. If you are using the 'min' or 'max'
function for aggregateLine, this can cause an unusual gap in the
Expand Down Expand Up @@ -4110,16 +4113,21 @@ def aggregateLine(requestContext, seriesList, func='average'):
else:
name = 'aggregateLine(%s, None)' % (series.name)

[series] = constantLine(requestContext, value)
series.name = name
series.pathExpression = name
results.append(series)
if keepStep:
aggSeries = series.copy(name=name, values=[value] * len(series))
else:
[aggSeries] = constantLine(requestContext, value)
aggSeries.name = name
aggSeries.pathExpression = name

results.append(aggSeries)
return results

aggregateLine.group = 'Calculate'
aggregateLine.params = [
Param('seriesList', ParamTypes.seriesList, required=True),
Param('func', ParamTypes.aggFunc, default='average', options=aggFuncNames),
Param('keepStep', ParamTypes.boolean, default=False),
]

def verticalLine(requestContext, ts, label=None, color=None):
Expand Down Expand Up @@ -4848,7 +4856,7 @@ def hitcount(requestContext, seriesList, intervalString, alignToInterval = False

for series in seriesList:
step = int(series.step)
bucket_count = int(math.ceil(float(series.end - series.start) // interval))
bucket_count = int(math.ceil(float(series.end - series.start) / interval))
buckets = [[] for _ in range(bucket_count)]
newStart = int(series.end - bucket_count * interval)

Expand Down
Loading

0 comments on commit 94d4200

Please sign in to comment.