diff --git a/.travis.yml b/.travis.yml index 85f3f8b37..7eae4bb29 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ env: - TOXENV=py27-django19-pyparsing2 - TOXENV=py27-django110-pyparsing2 - TOXENV=py27-django111-pyparsing2-rrdtool + - TOXENV=py27-django111-pyparsing2-msgpack - TOXENV=py27-django111-pyparsing2-mysql TEST_MYSQL_PASSWORD=graphite - TOXENV=py27-django111-pyparsing2-postgresql TEST_POSTGRESQL_PASSWORD=graphite - TOXENV=docs diff --git a/docs/config-local-settings.rst b/docs/config-local-settings.rst index 148a8cc59..f149ce1fa 100644 --- a/docs/config-local-settings.rst +++ b/docs/config-local-settings.rst @@ -11,19 +11,19 @@ Config File Location General Settings ---------------- URL_PREFIX - `Default: /` + `Default: /` Set the URL_PREFIX when deploying graphite-web to a non-root location. SECRET_KEY `Default: UNSAFE_DEFAULT` - + This key is used for salting of hashes used in auth tokens, CRSF middleware, cookie storage, etc. This should be set identically among all nodes if used behind a load balancer. ALLOWED_HOSTS `Default: *` - - In Django 1.5+ set the list of hosts from where your graphite instances is accessible. + + In Django 1.5+ set the list of hosts from where your graphite instances is accessible. See: https://docs.djangoproject.com/en/dev/ref/settings/#std:setting-ALLOWED_HOSTS TIME_ZONE @@ -33,7 +33,7 @@ TIME_ZONE DATE_FORMAT `Default: %m/%d` - + Set the default short date format. See strftime(3) for supported sequences. DOCUMENTATION_URL @@ -75,9 +75,9 @@ MEMCACHE_KEY_PREFIX MEMCACHE_OPTIONS `Default: {}` - - Accepted options depend on the Memcached implementation and the Django version. - Until Django 1.10, options are used only for pylibmc. + + Accepted options depend on the Memcached implementation and the Django version. + Until Django 1.10, options are used only for pylibmc. Starting from 1.11, options are used for both python-memcached and pylibmc. DEFAULT_CACHE_DURATION @@ -99,12 +99,12 @@ DEFAULT_CACHE_POLICY AUTO_REFRESH_INTERVAL `Default: 60` - + Interval for the Auto-Refresh feature in the Composer, measured in seconds. MAX_TAG_LENGTH `Default: 50` - + Graphite uses Django Tagging to support tags in Events. By default each tag is limited to 50 characters. Filesystem Paths @@ -383,30 +383,31 @@ If you're using the default SQLite database, your webserver will need permission Cluster Configuration --------------------- -These settings configure the Graphite webapp for clustered use. When ``CLUSTER_SERVERS`` is set, metric browse and render requests will cause the webapp to query other webapps in CLUSTER_SERVERS for matching metrics. Graphite will use only one successfully matching response to render data. This means that metrics may only live on a single server in the cluster unless the data is consistent on both sources (e.g. with shared SAN storage). Duplicate metric data existing in multiple locations will *not* be combined. +These settings configure the Graphite webapp for clustered use. When ``CLUSTER_SERVERS`` is set, metric browse and render requests will cause the webapp to query other webapps in CLUSTER_SERVERS for matching metrics. Graphite can either merge responses or choose the best response if more than one cluster server returns the same series. CLUSTER_SERVERS `Default: []` - The list of IP addresses and ports of remote Graphite webapps in a cluster. Each of these servers should have local access to metric data to serve. The first server to return a match for a query will be used to serve that data. Ex: ["10.0.2.2:80", "10.0.2.3:80"] + The list of IP addresses and ports of remote Graphite webapps in a cluster. Each of these servers should have local access to metric data to serve. Ex: ["10.0.2.2:80", "http://10.0.2.3:80?format=pickle&local=1"] + + Cluster server definitions can optionally include a protocol (http:// or https://) and/or additional config parameters. + + The `format` parameter can be set to `pickle` (the default) or `msgpack` to control the encoding used for intra-cluster find and render requests. + + The `local` parameter can be set to `1` (the default) or `0` to control whether cluster servers should only return results from local finders, or fan the request out to their remote finders. USE_WORKER_POOL `Default: True` - - Creates a pool of worker threads to which tasks can be dispatched. This makes sense if there are multiple CLUSTER_SERVERS because then the communication with them can be parallelized - The number of threads is equal to: POOL_WORKERS_PER_BACKEND * len(CLUSTER_SERVERS) + POOL_WORKERS - + + Creates a pool of worker threads to which tasks can be dispatched. This makes sense if there are multiple CLUSTER_SERVERS and/or STORAGE_FINDERS because then the communication with them can be parallelized. + The number of threads is equal to: min(number of finders, POOL_MAX_WORKERS) + Be careful when increasing the number of threads, in particular if your start multiple graphite-web processes (with uwsgi or similar) as this will increase memory consumption (and number of connections to memcached). - - POOL_WORKERS_PER_BACKEND - `Default: 1` - - The number of worker threads that should be created per backend server. It makes sense to have more than one thread per backend server if the graphite-web web server itself is multi threaded and can handle multiple incoming requests at once. - POOL_WORKERS - `Default: 1` - - A baseline number of workers that should always be created, no matter how many cluster servers are configured. These are used for other tasks that can be off-loaded from the request handling threads. +POOL_MAX_WORKERS + `Default: 10` + + The maximum number of worker threads that should be created. REMOTE_FETCH_TIMEOUT `Default: 6` @@ -427,22 +428,22 @@ FIND_CACHE_DURATION `Default: 300` Time to cache remote metric find results in seconds. - + MAX_FETCH_RETRIES `Default: 2` - + Number of retries for a specific remote data fetch. FIND_TOLERANCE `Default: FIND_TOLERANCE = 2 * FIND_CACHE_DURATION` - + If the query doesn't fall entirely within the FIND_TOLERANCE window we disregard the window. This prevents unnecessary remote fetches caused when carbon's cache skews node.intervals, giving the appearance remote systems have data we don't have locally, which we probably do. REMOTE_STORE_MERGE_RESULTS `Default: True` - - During a rebalance of a consistent hash cluster, after a partition event on a replication > 1 cluster or in other cases we might receive multiple TimeSeries data for a metric key. + + During a rebalance of a consistent hash cluster, after a partition event on a replication > 1 cluster or in other cases we might receive multiple TimeSeries data for a metric key. Merge them together rather than choosing the "most complete" one (pre-0.9.14 behaviour). REMOTE_STORE_USE_POST @@ -455,15 +456,9 @@ REMOTE_STORE_FORWARD_HEADERS Provide a list of HTTP headers that you want forwarded on from this host when making a request to a remote webapp server in CLUSTER_SERVERS. -REMOTE_PREFETCH_DATA - `Default: False` - - If enabled it will fetch all metrics using a single http request per remote server instead of one http request per target, per remote server. - This is especially useful when generating graphs with more than 4-5 targets or if there's significant latency between this server and the backends. - REMOTE_EXCLUDE_LOCAL `Default: False` - + Try to detect when a cluster server is localhost and don't forward queries REMOTE_RENDERING @@ -506,7 +501,7 @@ CARBON_METRIC_PREFIX: INTRACLUSTER_HTTPS `Default: False` - This setting controls whether https is used to communicate between cluster members. + This setting controls whether https is used to communicate between cluster members that don't have an explicit protocol specified. Additional Django Settings @@ -520,4 +515,4 @@ To manipulate these settings, ensure ``app_settings.py`` is imported as such: from graphite.app_settings import * The most common settings to manipulate are ``INSTALLED_APPS``, ``MIDDLEWARE``, and ``AUTHENTICATION_BACKENDS``. - + diff --git a/tox.ini b/tox.ini index 06ffe324d..62c77448c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - py27-django1{8,9,10,11}-pyparsing2{,-mysql,-postgresql,-rrdtool}, + py27-django1{8,9,10,11}-pyparsing2{,-mysql,-postgresql,-rrdtool,-msgpack}, lint, docs [testenv] @@ -38,6 +38,7 @@ deps = mysql: mysqlclient postgresql: psycopg2 rrdtool: rrdtool + msgpack: msgpack-python [testenv:docs] basepython = python2.7 diff --git a/webapp/graphite/finders/remote.py b/webapp/graphite/finders/remote.py index 20407e152..71e894f94 100644 --- a/webapp/graphite/finders/remote.py +++ b/webapp/graphite/finders/remote.py @@ -1,6 +1,8 @@ +import codecs import time from urllib import urlencode +from urlparse import urlsplit, parse_qs from django.conf import settings from django.core.cache import cache @@ -10,7 +12,7 @@ from graphite.logger import log from graphite.node import LeafNode, BranchNode from graphite.render.hashing import compactHash -from graphite.util import unpickle, logtime, is_local_interface, json +from graphite.util import unpickle, logtime, is_local_interface, json, msgpack from graphite.finders.utils import BaseFinder from graphite.readers.remote import RemoteReader @@ -23,13 +25,30 @@ class RemoteFinder(BaseFinder): def factory(cls): finders = [] for host in settings.CLUSTER_SERVERS: - if settings.REMOTE_EXCLUDE_LOCAL and is_local_interface(host): + if settings.REMOTE_EXCLUDE_LOCAL and is_local_interface(cls.parse_host(host)['host']): continue finders.append(cls(host)) return finders + @staticmethod + def parse_host(host): + if host.startswith('http://') or host.startswith('https://'): + parsed = urlsplit(host) + else: + scheme = 'https' if settings.INTRACLUSTER_HTTPS else 'http' + parsed = urlsplit(scheme + '://' + host) + + return { + 'host': parsed.netloc, + 'url': '%s://%s%s' % (parsed.scheme, parsed.netloc, parsed.path), + 'params': {key: value[-1] for (key, value) in parse_qs(parsed.query).items()}, + } + def __init__(self, host): - self.host = host + parsed = self.parse_host(host) + self.host = parsed['host'] + self.url = parsed['url'] + self.params = parsed['params'] self.last_failure = 0 @property @@ -71,8 +90,8 @@ def find_nodes(self, query, timer=None): url = '/metrics/find/' query_params = [ - ('local', '1'), - ('format', 'pickle'), + ('local', self.params.get('local', '1')), + ('format', self.params.get('format', 'pickle')), ('query', query.pattern), ] if query.startTime: @@ -88,13 +107,18 @@ def find_nodes(self, query, timer=None): timeout=settings.REMOTE_FIND_TIMEOUT) try: - results = unpickle.loads(result.data) + if result.getheader('content-type') == 'application/x-msgpack': + results = msgpack.load(result) + else: + results = unpickle.load(result) except Exception as err: self.fail() log.exception( "RemoteFinder[%s] Error decoding find response from %s: %s" % (self.host, result.url_full, err)) raise Exception("Error decoding find response from %s: %s" % (result.url_full, err)) + finally: + result.release_conn() cache.set(cacheKey, results, settings.FIND_CACHE_DURATION) @@ -134,25 +158,27 @@ def get_index(self, requestContext): result = self.request( url, fields=[ - ('local', '1'), + ('local', self.params.get('local', '1')), ], headers=headers, timeout=settings.REMOTE_FIND_TIMEOUT) try: - results = json.loads(result.data) + reader = codecs.getreader('utf-8') + results = json.load(reader(result)) except Exception as err: self.fail() log.exception( "RemoteFinder[%s] Error decoding index response from %s: %s" % (self.host, result.url_full, err)) raise Exception("Error decoding index response from %s: %s" % (result.url_full, err)) + finally: + result.release_conn() return results - def request(self, url, fields=None, headers=None, timeout=None): - url = "%s://%s%s" % ( - 'https' if settings.INTRACLUSTER_HTTPS else 'http', self.host, url) + def request(self, path, fields=None, headers=None, timeout=None): + url = "%s%s" % (self.url, path) url_full = "%s?%s" % (url, urlencode(fields)) try: @@ -161,13 +187,15 @@ def request(self, url, fields=None, headers=None, timeout=None): url, fields=fields, headers=headers, - timeout=timeout) + timeout=timeout, + preload_content=False) except BaseException as err: self.fail() log.exception("RemoteFinder[%s] Error requesting %s: %s" % (self.host, url_full, err)) raise Exception("Error requesting %s: %s" % (url_full, err)) if result.status != 200: + result.release_conn() self.fail() log.exception( "RemoteFinder[%s] Error response %d from %s" % (self.host, result.status, url_full)) diff --git a/webapp/graphite/metrics/views.py b/webapp/graphite/metrics/views.py index 7dfa38ed7..8a161550a 100644 --- a/webapp/graphite/metrics/views.py +++ b/webapp/graphite/metrics/views.py @@ -23,13 +23,7 @@ from graphite.render.attime import parseATTime from graphite.storage import STORE, extractForwardHeaders from graphite.user_util import getProfile -from graphite.util import epoch -from graphite.util import json - -try: - import cPickle as pickle -except ImportError: - import pickle +from graphite.util import epoch, json, pickle, msgpack def index_json(request): @@ -141,6 +135,10 @@ def find_view(request): content = pickle_nodes(matches) response = HttpResponse(content, content_type='application/pickle') + elif format == 'msgpack': + content = msgpack_nodes(matches) + response = HttpResponse(content, content_type='application/x-msgpack') + elif format == 'json': content = json_nodes(matches) response = json_response_for(request, content, jsonp=jsonp) @@ -331,6 +329,19 @@ def pickle_nodes(nodes): return pickle.dumps(nodes_info, protocol=-1) +def msgpack_nodes(nodes): + nodes_info = [] + + for node in nodes: + info = dict(path=node.path, is_leaf=node.is_leaf) + if node.is_leaf: + info['intervals'] = [interval.tuple for interval in node.intervals] + + nodes_info.append(info) + + return msgpack.dumps(nodes_info) + + def json_nodes(nodes): nodes_info = [] diff --git a/webapp/graphite/readers/remote.py b/webapp/graphite/readers/remote.py index 1353ee130..5ec2e85e9 100644 --- a/webapp/graphite/readers/remote.py +++ b/webapp/graphite/readers/remote.py @@ -4,7 +4,7 @@ from graphite.logger import log from graphite.readers.utils import BaseReader -from graphite.util import unpickle +from graphite.util import unpickle, msgpack class RemoteReader(BaseReader): @@ -39,8 +39,8 @@ def fetch_multi(self, startTime, endTime, now=None, requestContext=None): return [] query_params = [ - ('format', 'pickle'), - ('local', '1'), + ('format', self.finder.params.get('format', 'pickle')), + ('local', self.finder.params.get('local', '1')), ('noCache', '1'), ('from', int(startTime)), ('until', int(endTime)) @@ -75,13 +75,18 @@ def fetch_multi(self, startTime, endTime, now=None, requestContext=None): retries += 1 try: - data = unpickle.loads(result.data) + if result.getheader('content-type') == 'application/x-msgpack': + data = msgpack.load(result) + else: + data = unpickle.load(result) except Exception as err: self.finder.fail() log.exception( "RemoteReader[%s] Error decoding render response from %s: %s" % (self.finder.host, result.url_full, err)) raise Exception("Error decoding render response from %s: %s" % (result.url_full, err)) + finally: + result.release_conn() return [ { diff --git a/webapp/graphite/render/views.py b/webapp/graphite/render/views.py index 56d1ced4e..14be8636b 100755 --- a/webapp/graphite/render/views.py +++ b/webapp/graphite/render/views.py @@ -22,16 +22,10 @@ from urllib import urlencode from urlparse import urlsplit, urlunsplit from cgi import parse_qs -from cStringIO import StringIO - -try: - import cPickle as pickle -except ImportError: - import pickle from graphite.compat import HttpResponse from graphite.user_util import getProfileByUsername -from graphite.util import json, unpickle +from graphite.util import json, unpickle, pickle, msgpack, StringIO from graphite.storage import extractForwardHeaders from graphite.logger import log from graphite.render.evaluator import evaluateTarget @@ -138,6 +132,8 @@ def renderView(request): response = renderViewRaw(requestOptions, data) elif format == 'pickle': response = renderViewPickle(requestOptions, data) + elif format == 'msgpack': + response = renderViewMsgPack(requestOptions, data) # if response wasn't generated above, render a graph image if not response: @@ -323,6 +319,13 @@ def renderViewPickle(requestOptions, data): return response +def renderViewMsgPack(requestOptions, data): + response = HttpResponse(content_type='application/x-msgpack') + seriesInfo = [series.getInfo() for series in data] + msgpack.dump(seriesInfo, response) + return response + + def parseOptions(request): queryParams = request.GET.copy() queryParams.update(request.POST) diff --git a/webapp/graphite/umsgpack.py b/webapp/graphite/umsgpack.py new file mode 100644 index 000000000..cd7a2037e --- /dev/null +++ b/webapp/graphite/umsgpack.py @@ -0,0 +1,1057 @@ +# u-msgpack-python v2.4.1 - v at sergeev.io +# https://github.com/vsergeev/u-msgpack-python +# +# u-msgpack-python is a lightweight MessagePack serializer and deserializer +# module, compatible with both Python 2 and 3, as well CPython and PyPy +# implementations of Python. u-msgpack-python is fully compliant with the +# latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In +# particular, it supports the new binary, UTF-8 string, and application ext +# types. +# +# MIT License +# +# Copyright (c) 2013-2016 vsergeev / Ivan (Vanya) A. Sergeev +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +""" +u-msgpack-python v2.4.1 - v at sergeev.io +https://github.com/vsergeev/u-msgpack-python + +u-msgpack-python is a lightweight MessagePack serializer and deserializer +module, compatible with both Python 2 and 3, as well CPython and PyPy +implementations of Python. u-msgpack-python is fully compliant with the +latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In +particular, it supports the new binary, UTF-8 string, and application ext +types. + +License: MIT +""" +import struct +import collections +import sys +import io + +__version__ = "2.4.1" +"Module version string" + +version = (2, 4, 1) +"Module version tuple" + + +############################################################################## +# Ext Class +############################################################################## + +# Extension type for application-defined types and data +class Ext: + """ + The Ext class facilitates creating a serializable extension object to store + an application-defined type and data byte array. + """ + + def __init__(self, type, data): + """ + Construct a new Ext object. + + Args: + type: application-defined type integer from 0 to 127 + data: application-defined data byte array + + Raises: + TypeError: + Specified ext type is outside of 0 to 127 range. + + Example: + >>> foo = umsgpack.Ext(0x05, b"\x01\x02\x03") + >>> umsgpack.packb({u"special stuff": foo, u"awesome": True}) + '\x82\xa7awesome\xc3\xadspecial stuff\xc7\x03\x05\x01\x02\x03' + >>> bar = umsgpack.unpackb(_) + >>> print(bar["special stuff"]) + Ext Object (Type: 0x05, Data: 01 02 03) + >>> + """ + # Application ext type should be 0 <= type <= 127 + if not isinstance(type, int) or not (type >= 0 and type <= 127): + raise TypeError("ext type out of range") + # Check data is type bytes + elif sys.version_info[0] == 3 and not isinstance(data, bytes): + raise TypeError("ext data is not type \'bytes\'") + elif sys.version_info[0] == 2 and not isinstance(data, str): + raise TypeError("ext data is not type \'str\'") + self.type = type + self.data = data + + def __eq__(self, other): + """ + Compare this Ext object with another for equality. + """ + return (isinstance(other, self.__class__) and + self.type == other.type and + self.data == other.data) + + def __ne__(self, other): + """ + Compare this Ext object with another for inequality. + """ + return not self.__eq__(other) + + def __str__(self): + """ + String representation of this Ext object. + """ + s = "Ext Object (Type: 0x%02x, Data: " % self.type + s += " ".join(["0x%02x" % ord(self.data[i:i + 1]) + for i in xrange(min(len(self.data), 8))]) + if len(self.data) > 8: + s += " ..." + s += ")" + return s + + def __hash__(self): + """ + Provide a hash of this Ext object. + """ + return hash((self.type, self.data)) + + +class InvalidString(bytes): + """Subclass of bytes to hold invalid UTF-8 strings.""" + pass + +############################################################################## +# Exceptions +############################################################################## + + +# Base Exception classes +class PackException(Exception): + "Base class for exceptions encountered during packing." + pass + + +class UnpackException(Exception): + "Base class for exceptions encountered during unpacking." + pass + + +# Packing error +class UnsupportedTypeException(PackException): + "Object type not supported for packing." + pass + + +# Unpacking error +class InsufficientDataException(UnpackException): + "Insufficient data to unpack the serialized object." + pass + + +class InvalidStringException(UnpackException): + "Invalid UTF-8 string encountered during unpacking." + pass + + +class ReservedCodeException(UnpackException): + "Reserved code encountered during unpacking." + pass + + +class UnhashableKeyException(UnpackException): + """ + Unhashable key encountered during map unpacking. + The serialized map cannot be deserialized into a Python dictionary. + """ + pass + + +class DuplicateKeyException(UnpackException): + "Duplicate key encountered during map unpacking." + pass + + +# Backwards compatibility +KeyNotPrimitiveException = UnhashableKeyException +KeyDuplicateException = DuplicateKeyException + +############################################################################# +# Exported Functions and Glob +############################################################################# + +# Exported functions and variables, set up in __init() +pack = None +packb = None +unpack = None +unpackb = None +dump = None +dumps = None +load = None +loads = None + +compatibility = False +""" +Compatibility mode boolean. + +When compatibility mode is enabled, u-msgpack-python will serialize both +unicode strings and bytes into the old "raw" msgpack type, and deserialize the +"raw" msgpack type into bytes. This provides backwards compatibility with the +old MessagePack specification. + +Example: +>>> umsgpack.compatibility = True +>>> +>>> umsgpack.packb([u"some string", b"some bytes"]) +b'\x92\xabsome string\xaasome bytes' +>>> umsgpack.unpackb(_) +[b'some string', b'some bytes'] +>>> +""" + +############################################################################## +# Packing +############################################################################## + +# You may notice struct.pack("B", obj) instead of the simpler chr(obj) in the +# code below. This is to allow for seamless Python 2 and 3 compatibility, as +# chr(obj) has a str return type instead of bytes in Python 3, and +# struct.pack(...) has the right return type in both versions. + + +def _pack_integer(obj, fp, options): + if obj < 0: + if obj >= -32: + fp.write(struct.pack("b", obj)) + elif obj >= -2**(8 - 1): + fp.write(b"\xd0" + struct.pack("b", obj)) + elif obj >= -2**(16 - 1): + fp.write(b"\xd1" + struct.pack(">h", obj)) + elif obj >= -2**(32 - 1): + fp.write(b"\xd2" + struct.pack(">i", obj)) + elif obj >= -2**(64 - 1): + fp.write(b"\xd3" + struct.pack(">q", obj)) + else: + raise UnsupportedTypeException("huge signed int") + else: + if obj <= 127: + fp.write(struct.pack("B", obj)) + elif obj <= 2**8 - 1: + fp.write(b"\xcc" + struct.pack("B", obj)) + elif obj <= 2**16 - 1: + fp.write(b"\xcd" + struct.pack(">H", obj)) + elif obj <= 2**32 - 1: + fp.write(b"\xce" + struct.pack(">I", obj)) + elif obj <= 2**64 - 1: + fp.write(b"\xcf" + struct.pack(">Q", obj)) + else: + raise UnsupportedTypeException("huge unsigned int") + + +def _pack_nil(obj, fp, options): + fp.write(b"\xc0") + + +def _pack_boolean(obj, fp, options): + fp.write(b"\xc3" if obj else b"\xc2") + + +def _pack_float(obj, fp, options): + float_precision = options.get('force_float_precision', _float_precision) + + if float_precision == "double": + fp.write(b"\xcb" + struct.pack(">d", obj)) + elif float_precision == "single": + fp.write(b"\xca" + struct.pack(">f", obj)) + else: + raise ValueError("invalid float precision") + + +def _pack_string(obj, fp, options): + obj = obj.encode('utf-8') + if len(obj) <= 31: + fp.write(struct.pack("B", 0xa0 | len(obj)) + obj) + elif len(obj) <= 2**8 - 1: + fp.write(b"\xd9" + struct.pack("B", len(obj)) + obj) + elif len(obj) <= 2**16 - 1: + fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj) + elif len(obj) <= 2**32 - 1: + fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj) + else: + raise UnsupportedTypeException("huge string") + + +def _pack_binary(obj, fp, options): + if len(obj) <= 2**8 - 1: + fp.write(b"\xc4" + struct.pack("B", len(obj)) + obj) + elif len(obj) <= 2**16 - 1: + fp.write(b"\xc5" + struct.pack(">H", len(obj)) + obj) + elif len(obj) <= 2**32 - 1: + fp.write(b"\xc6" + struct.pack(">I", len(obj)) + obj) + else: + raise UnsupportedTypeException("huge binary string") + + +def _pack_oldspec_raw(obj, fp, options): + if len(obj) <= 31: + fp.write(struct.pack("B", 0xa0 | len(obj)) + obj) + elif len(obj) <= 2**16 - 1: + fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj) + elif len(obj) <= 2**32 - 1: + fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj) + else: + raise UnsupportedTypeException("huge raw string") + + +def _pack_ext(obj, fp, options): + if len(obj.data) == 1: + fp.write(b"\xd4" + struct.pack("B", obj.type & 0xff) + obj.data) + elif len(obj.data) == 2: + fp.write(b"\xd5" + struct.pack("B", obj.type & 0xff) + obj.data) + elif len(obj.data) == 4: + fp.write(b"\xd6" + struct.pack("B", obj.type & 0xff) + obj.data) + elif len(obj.data) == 8: + fp.write(b"\xd7" + struct.pack("B", obj.type & 0xff) + obj.data) + elif len(obj.data) == 16: + fp.write(b"\xd8" + struct.pack("B", obj.type & 0xff) + obj.data) + elif len(obj.data) <= 2**8 - 1: + fp.write(b"\xc7" + + struct.pack("BB", len(obj.data), obj.type & 0xff) + obj.data) + elif len(obj.data) <= 2**16 - 1: + fp.write(b"\xc8" + + struct.pack(">HB", len(obj.data), obj.type & 0xff) + obj.data) + elif len(obj.data) <= 2**32 - 1: + fp.write(b"\xc9" + + struct.pack(">IB", len(obj.data), obj.type & 0xff) + obj.data) + else: + raise UnsupportedTypeException("huge ext data") + + +def _pack_array(obj, fp, options): + if len(obj) <= 15: + fp.write(struct.pack("B", 0x90 | len(obj))) + elif len(obj) <= 2**16 - 1: + fp.write(b"\xdc" + struct.pack(">H", len(obj))) + elif len(obj) <= 2**32 - 1: + fp.write(b"\xdd" + struct.pack(">I", len(obj))) + else: + raise UnsupportedTypeException("huge array") + + for e in obj: + pack(e, fp, **options) + + +def _pack_map(obj, fp, options): + if len(obj) <= 15: + fp.write(struct.pack("B", 0x80 | len(obj))) + elif len(obj) <= 2**16 - 1: + fp.write(b"\xde" + struct.pack(">H", len(obj))) + elif len(obj) <= 2**32 - 1: + fp.write(b"\xdf" + struct.pack(">I", len(obj))) + else: + raise UnsupportedTypeException("huge array") + + for k, v in obj.items(): + pack(k, fp, **options) + pack(v, fp, **options) + +######################################## + + +# Pack for Python 2, with 'unicode' type, 'str' type, and 'long' type +def _pack2(obj, fp, **options): + """ + Serialize a Python object into MessagePack bytes. + + Args: + obj: a Python object + fp: a .write()-supporting file-like object + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping a custom type + to a callable that packs an instance of the type + into an Ext object + force_float_precision (str): "single" to force packing floats as + IEEE-754 single-precision floats, + "double" to force packing floats as + IEEE-754 double-precision floats. + + Returns: + None. + + Raises: + UnsupportedType(PackException): + Object type not supported for packing. + + Example: + >>> f = open('test.bin', 'wb') + >>> umsgpack.pack({u"compact": True, u"schema": 0}, f) + >>> + """ + global compatibility + + ext_handlers = options.get("ext_handlers") + + if obj is None: + _pack_nil(obj, fp, options) + elif ext_handlers and obj.__class__ in ext_handlers: + _pack_ext(ext_handlers[obj.__class__](obj), fp, options) + elif isinstance(obj, bool): + _pack_boolean(obj, fp, options) + elif isinstance(obj, int) or isinstance(obj, long): + _pack_integer(obj, fp, options) + elif isinstance(obj, float): + _pack_float(obj, fp, options) + elif compatibility and isinstance(obj, unicode): + _pack_oldspec_raw(bytes(obj), fp, options) + elif compatibility and isinstance(obj, bytes): + _pack_oldspec_raw(obj, fp, options) + elif isinstance(obj, unicode): + _pack_string(obj, fp, options) + elif isinstance(obj, str): + _pack_binary(obj, fp, options) + elif isinstance(obj, list) or isinstance(obj, tuple): + _pack_array(obj, fp, options) + elif isinstance(obj, dict): + _pack_map(obj, fp, options) + elif isinstance(obj, Ext): + _pack_ext(obj, fp, options) + elif ext_handlers: + # Linear search for superclass + t = next((t for t in ext_handlers.keys() if isinstance(obj, t)), None) + if t: + _pack_ext(ext_handlers[t](obj), fp, options) + else: + raise UnsupportedTypeException( + "unsupported type: %s" % str(type(obj))) + else: + raise UnsupportedTypeException("unsupported type: %s" % str(type(obj))) + + +# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type +def _pack3(obj, fp, **options): + """ + Serialize a Python object into MessagePack bytes. + + Args: + obj: a Python object + fp: a .write()-supporting file-like object + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping a custom type + to a callable that packs an instance of the type + into an Ext object + force_float_precision (str): "single" to force packing floats as + IEEE-754 single-precision floats, + "double" to force packing floats as + IEEE-754 double-precision floats. + + Returns: + None. + + Raises: + UnsupportedType(PackException): + Object type not supported for packing. + + Example: + >>> f = open('test.bin', 'wb') + >>> umsgpack.pack({u"compact": True, u"schema": 0}, f) + >>> + """ + global compatibility + + ext_handlers = options.get("ext_handlers") + + if obj is None: + _pack_nil(obj, fp, options) + elif ext_handlers and obj.__class__ in ext_handlers: + _pack_ext(ext_handlers[obj.__class__](obj), fp, options) + elif isinstance(obj, bool): + _pack_boolean(obj, fp, options) + elif isinstance(obj, int): + _pack_integer(obj, fp, options) + elif isinstance(obj, float): + _pack_float(obj, fp, options) + elif compatibility and isinstance(obj, str): + _pack_oldspec_raw(obj.encode('utf-8'), fp, options) + elif compatibility and isinstance(obj, bytes): + _pack_oldspec_raw(obj, fp, options) + elif isinstance(obj, str): + _pack_string(obj, fp, options) + elif isinstance(obj, bytes): + _pack_binary(obj, fp, options) + elif isinstance(obj, list) or isinstance(obj, tuple): + _pack_array(obj, fp, options) + elif isinstance(obj, dict): + _pack_map(obj, fp, options) + elif isinstance(obj, Ext): + _pack_ext(obj, fp, options) + elif ext_handlers: + # Linear search for superclass + t = next((t for t in ext_handlers.keys() if isinstance(obj, t)), None) + if t: + _pack_ext(ext_handlers[t](obj), fp, options) + else: + raise UnsupportedTypeException( + "unsupported type: %s" % str(type(obj))) + else: + raise UnsupportedTypeException( + "unsupported type: %s" % str(type(obj))) + + +def _packb2(obj, **options): + """ + Serialize a Python object into MessagePack bytes. + + Args: + obj: a Python object + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping a custom type + to a callable that packs an instance of the type + into an Ext object + force_float_precision (str): "single" to force packing floats as + IEEE-754 single-precision floats, + "double" to force packing floats as + IEEE-754 double-precision floats. + + Returns: + A 'str' containing serialized MessagePack bytes. + + Raises: + UnsupportedType(PackException): + Object type not supported for packing. + + Example: + >>> umsgpack.packb({u"compact": True, u"schema": 0}) + '\x82\xa7compact\xc3\xa6schema\x00' + >>> + """ + fp = io.BytesIO() + _pack2(obj, fp, **options) + return fp.getvalue() + + +def _packb3(obj, **options): + """ + Serialize a Python object into MessagePack bytes. + + Args: + obj: a Python object + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping a custom type + to a callable that packs an instance of the type + into an Ext object + force_float_precision (str): "single" to force packing floats as + IEEE-754 single-precision floats, + "double" to force packing floats as + IEEE-754 double-precision floats. + + Returns: + A 'bytes' containing serialized MessagePack bytes. + + Raises: + UnsupportedType(PackException): + Object type not supported for packing. + + Example: + >>> umsgpack.packb({u"compact": True, u"schema": 0}) + b'\x82\xa7compact\xc3\xa6schema\x00' + >>> + """ + fp = io.BytesIO() + _pack3(obj, fp, **options) + return fp.getvalue() + +############################################################################# +# Unpacking +############################################################################# + + +def _read_except(fp, n): + data = fp.read(n) + if len(data) < n: + raise InsufficientDataException() + return data + + +def _unpack_integer(code, fp, options): + if (ord(code) & 0xe0) == 0xe0: + return struct.unpack("b", code)[0] + elif code == b'\xd0': + return struct.unpack("b", _read_except(fp, 1))[0] + elif code == b'\xd1': + return struct.unpack(">h", _read_except(fp, 2))[0] + elif code == b'\xd2': + return struct.unpack(">i", _read_except(fp, 4))[0] + elif code == b'\xd3': + return struct.unpack(">q", _read_except(fp, 8))[0] + elif (ord(code) & 0x80) == 0x00: + return struct.unpack("B", code)[0] + elif code == b'\xcc': + return struct.unpack("B", _read_except(fp, 1))[0] + elif code == b'\xcd': + return struct.unpack(">H", _read_except(fp, 2))[0] + elif code == b'\xce': + return struct.unpack(">I", _read_except(fp, 4))[0] + elif code == b'\xcf': + return struct.unpack(">Q", _read_except(fp, 8))[0] + raise Exception("logic error, not int: 0x%02x" % ord(code)) + + +def _unpack_reserved(code, fp, options): + if code == b'\xc1': + raise ReservedCodeException( + "encountered reserved code: 0x%02x" % ord(code)) + raise Exception( + "logic error, not reserved code: 0x%02x" % ord(code)) + + +def _unpack_nil(code, fp, options): + if code == b'\xc0': + return None + raise Exception("logic error, not nil: 0x%02x" % ord(code)) + + +def _unpack_boolean(code, fp, options): + if code == b'\xc2': + return False + elif code == b'\xc3': + return True + raise Exception("logic error, not boolean: 0x%02x" % ord(code)) + + +def _unpack_float(code, fp, options): + if code == b'\xca': + return struct.unpack(">f", _read_except(fp, 4))[0] + elif code == b'\xcb': + return struct.unpack(">d", _read_except(fp, 8))[0] + raise Exception("logic error, not float: 0x%02x" % ord(code)) + + +def _unpack_string(code, fp, options): + if (ord(code) & 0xe0) == 0xa0: + length = ord(code) & ~0xe0 + elif code == b'\xd9': + length = struct.unpack("B", _read_except(fp, 1))[0] + elif code == b'\xda': + length = struct.unpack(">H", _read_except(fp, 2))[0] + elif code == b'\xdb': + length = struct.unpack(">I", _read_except(fp, 4))[0] + else: + raise Exception("logic error, not string: 0x%02x" % ord(code)) + + # Always return raw bytes in compatibility mode + global compatibility + if compatibility: + return _read_except(fp, length) + + data = _read_except(fp, length) + try: + return bytes.decode(data, 'utf-8') + except UnicodeDecodeError: + if options.get("allow_invalid_utf8"): + return InvalidString(data) + raise InvalidStringException("unpacked string is invalid utf-8") + + +def _unpack_binary(code, fp, options): + if code == b'\xc4': + length = struct.unpack("B", _read_except(fp, 1))[0] + elif code == b'\xc5': + length = struct.unpack(">H", _read_except(fp, 2))[0] + elif code == b'\xc6': + length = struct.unpack(">I", _read_except(fp, 4))[0] + else: + raise Exception("logic error, not binary: 0x%02x" % ord(code)) + + return _read_except(fp, length) + + +def _unpack_ext(code, fp, options): + if code == b'\xd4': + length = 1 + elif code == b'\xd5': + length = 2 + elif code == b'\xd6': + length = 4 + elif code == b'\xd7': + length = 8 + elif code == b'\xd8': + length = 16 + elif code == b'\xc7': + length = struct.unpack("B", _read_except(fp, 1))[0] + elif code == b'\xc8': + length = struct.unpack(">H", _read_except(fp, 2))[0] + elif code == b'\xc9': + length = struct.unpack(">I", _read_except(fp, 4))[0] + else: + raise Exception("logic error, not ext: 0x%02x" % ord(code)) + + ext = Ext(ord(_read_except(fp, 1)), _read_except(fp, length)) + + # Unpack with ext handler, if we have one + ext_handlers = options.get("ext_handlers") + if ext_handlers and ext.type in ext_handlers: + ext = ext_handlers[ext.type](ext) + + return ext + + +def _unpack_array(code, fp, options): + if (ord(code) & 0xf0) == 0x90: + length = (ord(code) & ~0xf0) + elif code == b'\xdc': + length = struct.unpack(">H", _read_except(fp, 2))[0] + elif code == b'\xdd': + length = struct.unpack(">I", _read_except(fp, 4))[0] + else: + raise Exception("logic error, not array: 0x%02x" % ord(code)) + + return [_unpack(fp, options) for i in xrange(length)] + + +def _deep_list_to_tuple(obj): + if isinstance(obj, list): + return tuple([_deep_list_to_tuple(e) for e in obj]) + return obj + + +def _unpack_map(code, fp, options): + if (ord(code) & 0xf0) == 0x80: + length = (ord(code) & ~0xf0) + elif code == b'\xde': + length = struct.unpack(">H", _read_except(fp, 2))[0] + elif code == b'\xdf': + length = struct.unpack(">I", _read_except(fp, 4))[0] + else: + raise Exception("logic error, not map: 0x%02x" % ord(code)) + + d = {} if not options.get('use_ordered_dict') \ + else collections.OrderedDict() + for _ in xrange(length): + # Unpack key + k = _unpack(fp, options) + + if isinstance(k, list): + # Attempt to convert list into a hashable tuple + k = _deep_list_to_tuple(k) + elif not isinstance(k, collections.Hashable): + raise UnhashableKeyException( + "encountered unhashable key: %s, %s" % (str(k), str(type(k)))) + elif k in d: + raise DuplicateKeyException( + "encountered duplicate key: %s, %s" % (str(k), str(type(k)))) + + # Unpack value + v = _unpack(fp, options) + + try: + d[k] = v + except TypeError: + raise UnhashableKeyException( + "encountered unhashable key: %s" % str(k)) + return d + + +def _unpack(fp, options): + code = _read_except(fp, 1) + return _unpack_dispatch_table[code](code, fp, options) + +######################################## + + +def _unpack2(fp, **options): + """ + Deserialize MessagePack bytes into a Python object. + + Args: + fp: a .read()-supporting file-like object + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping integer Ext + type to a callable that unpacks an instance of + Ext into an object + use_ordered_dict (bool): unpack maps into OrderedDict, instead of + unordered dict (default False) + allow_invalid_utf8 (bool): unpack invalid strings into instances of + InvalidString, for access to the bytes + (default False) + + Returns: + A Python object. + + Raises: + InsufficientDataException(UnpackException): + Insufficient data to unpack the serialized object. + InvalidStringException(UnpackException): + Invalid UTF-8 string encountered during unpacking. + ReservedCodeException(UnpackException): + Reserved code encountered during unpacking. + UnhashableKeyException(UnpackException): + Unhashable key encountered during map unpacking. + The serialized map cannot be deserialized into a Python dictionary. + DuplicateKeyException(UnpackException): + Duplicate key encountered during map unpacking. + + Example: + >>> f = open('test.bin', 'rb') + >>> umsgpack.unpackb(f) + {u'compact': True, u'schema': 0} + >>> + """ + return _unpack(fp, options) + + +def _unpack3(fp, **options): + """ + Deserialize MessagePack bytes into a Python object. + + Args: + fp: a .read()-supporting file-like object + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping integer Ext + type to a callable that unpacks an instance of + Ext into an object + use_ordered_dict (bool): unpack maps into OrderedDict, instead of + unordered dict (default False) + allow_invalid_utf8 (bool): unpack invalid strings into instances of + InvalidString, for access to the bytes + (default False) + + Returns: + A Python object. + + Raises: + InsufficientDataException(UnpackException): + Insufficient data to unpack the serialized object. + InvalidStringException(UnpackException): + Invalid UTF-8 string encountered during unpacking. + ReservedCodeException(UnpackException): + Reserved code encountered during unpacking. + UnhashableKeyException(UnpackException): + Unhashable key encountered during map unpacking. + The serialized map cannot be deserialized into a Python dictionary. + DuplicateKeyException(UnpackException): + Duplicate key encountered during map unpacking. + + Example: + >>> f = open('test.bin', 'rb') + >>> umsgpack.unpackb(f) + {'compact': True, 'schema': 0} + >>> + """ + return _unpack(fp, options) + + +# For Python 2, expects a str object +def _unpackb2(s, **options): + """ + Deserialize MessagePack bytes into a Python object. + + Args: + s: a 'str' or 'bytearray' containing serialized MessagePack bytes + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping integer Ext + type to a callable that unpacks an instance of + Ext into an object + use_ordered_dict (bool): unpack maps into OrderedDict, instead of + unordered dict (default False) + allow_invalid_utf8 (bool): unpack invalid strings into instances of + InvalidString, for access to the bytes + (default False) + + Returns: + A Python object. + + Raises: + TypeError: + Packed data type is neither 'str' nor 'bytearray'. + InsufficientDataException(UnpackException): + Insufficient data to unpack the serialized object. + InvalidStringException(UnpackException): + Invalid UTF-8 string encountered during unpacking. + ReservedCodeException(UnpackException): + Reserved code encountered during unpacking. + UnhashableKeyException(UnpackException): + Unhashable key encountered during map unpacking. + The serialized map cannot be deserialized into a Python dictionary. + DuplicateKeyException(UnpackException): + Duplicate key encountered during map unpacking. + + Example: + >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00') + {u'compact': True, u'schema': 0} + >>> + """ + if not isinstance(s, (str, bytearray)): + raise TypeError("packed data must be type 'str' or 'bytearray'") + return _unpack(io.BytesIO(s), options) + + +# For Python 3, expects a bytes object +def _unpackb3(s, **options): + """ + Deserialize MessagePack bytes into a Python object. + + Args: + s: a 'bytes' or 'bytearray' containing serialized MessagePack bytes + + Kwargs: + ext_handlers (dict): dictionary of Ext handlers, mapping integer Ext + type to a callable that unpacks an instance of + Ext into an object + use_ordered_dict (bool): unpack maps into OrderedDict, instead of + unordered dict (default False) + allow_invalid_utf8 (bool): unpack invalid strings into instances of + InvalidString, for access to the bytes + (default False) + + Returns: + A Python object. + + Raises: + TypeError: + Packed data type is neither 'bytes' nor 'bytearray'. + InsufficientDataException(UnpackException): + Insufficient data to unpack the serialized object. + InvalidStringException(UnpackException): + Invalid UTF-8 string encountered during unpacking. + ReservedCodeException(UnpackException): + Reserved code encountered during unpacking. + UnhashableKeyException(UnpackException): + Unhashable key encountered during map unpacking. + The serialized map cannot be deserialized into a Python dictionary. + DuplicateKeyException(UnpackException): + Duplicate key encountered during map unpacking. + + Example: + >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00') + {'compact': True, 'schema': 0} + >>> + """ + if not isinstance(s, (bytes, bytearray)): + raise TypeError("packed data must be type 'bytes' or 'bytearray'") + return _unpack(io.BytesIO(s), options) + +############################################################################# +# Module Initialization +############################################################################# + + +def __init(): + global pack + global packb + global unpack + global unpackb + global dump + global dumps + global load + global loads + global compatibility + global _float_precision + global _unpack_dispatch_table + global xrange + + # Compatibility mode for handling strings/bytes with the old specification + compatibility = False + + # Auto-detect system float precision + if sys.float_info.mant_dig == 53: + _float_precision = "double" + else: + _float_precision = "single" + + # Map packb and unpackb to the appropriate version + if sys.version_info[0] == 3: + pack = _pack3 + packb = _packb3 + dump = _pack3 + dumps = _packb3 + unpack = _unpack3 + unpackb = _unpackb3 + load = _unpack3 + loads = _unpackb3 + xrange = range + else: + pack = _pack2 + packb = _packb2 + dump = _pack2 + dumps = _packb2 + unpack = _unpack2 + unpackb = _unpackb2 + load = _unpack2 + loads = _unpackb2 + + # Build a dispatch table for fast lookup of unpacking function + + _unpack_dispatch_table = {} + # Fix uint + for code in range(0, 0x7f + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer + # Fix map + for code in range(0x80, 0x8f + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_map + # Fix array + for code in range(0x90, 0x9f + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_array + # Fix str + for code in range(0xa0, 0xbf + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string + # Nil + _unpack_dispatch_table[b'\xc0'] = _unpack_nil + # Reserved + _unpack_dispatch_table[b'\xc1'] = _unpack_reserved + # Boolean + _unpack_dispatch_table[b'\xc2'] = _unpack_boolean + _unpack_dispatch_table[b'\xc3'] = _unpack_boolean + # Bin + for code in range(0xc4, 0xc6 + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_binary + # Ext + for code in range(0xc7, 0xc9 + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext + # Float + _unpack_dispatch_table[b'\xca'] = _unpack_float + _unpack_dispatch_table[b'\xcb'] = _unpack_float + # Uint + for code in range(0xcc, 0xcf + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer + # Int + for code in range(0xd0, 0xd3 + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer + # Fixext + for code in range(0xd4, 0xd8 + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext + # String + for code in range(0xd9, 0xdb + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string + # Array + _unpack_dispatch_table[b'\xdc'] = _unpack_array + _unpack_dispatch_table[b'\xdd'] = _unpack_array + # Map + _unpack_dispatch_table[b'\xde'] = _unpack_map + _unpack_dispatch_table[b'\xdf'] = _unpack_map + # Negative fixint + for code in range(0xe0, 0xff + 1): + _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer + + +__init() diff --git a/webapp/graphite/util.py b/webapp/graphite/util.py index f7ceb7cc0..4c980cf79 100644 --- a/webapp/graphite/util.py +++ b/webapp/graphite/util.py @@ -35,6 +35,13 @@ except ImportError: from StringIO import StringIO +# use https://github.com/msgpack/msgpack-python if available +try: + import msgpack # NOQA +# otherwise fall back to bundled https://github.com/vsergeev/u-msgpack-python +except ImportError: + import graphite.umsgpack as msgpack # NOQA + from django.conf import settings from django.utils.timezone import make_aware from graphite.logger import log @@ -174,6 +181,14 @@ def loads(cls, pickle_string): pickle_obj.find_global = cls.find_class return pickle_obj.load() + @classmethod + def load(cls, file): + pickle_obj = pickle.Unpickler(file) + pickle_obj.find_global = cls.find_class + return pickle_obj.load() + + unpickle = SafeUnpickler + else: class SafeUnpickler(pickle.Unpickler): PICKLE_SAFE = { @@ -193,11 +208,14 @@ def find_class(self, module, name): raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name) return getattr(mod, name) - @classmethod - def loads(cls, pickle_string): - return cls(StringIO(pickle_string)).load() + class unpickle(object): + @staticmethod + def loads(pickle_string): + return SafeUnpickler(StringIO(pickle_string)).load() -unpickle = SafeUnpickler + @staticmethod + def load(file): + return SafeUnpickler(file).load() class Timer(object): diff --git a/webapp/tests/test_finders_remote.py b/webapp/tests/test_finders_remote.py index 12bee9e41..5aacadf77 100644 --- a/webapp/tests/test_finders_remote.py +++ b/webapp/tests/test_finders_remote.py @@ -1,9 +1,7 @@ import logging -import pickle import types from urllib3.response import HTTPResponse -from StringIO import StringIO from django.test import override_settings from mock import patch @@ -11,7 +9,7 @@ from graphite.finders.remote import RemoteFinder from graphite.finders.utils import FindQuery from graphite.node import BranchNode, LeafNode -from graphite.util import json +from graphite.util import json, pickle, StringIO, msgpack from .base import TestCase @@ -80,7 +78,7 @@ def test_find_nodes(self, http_request): 'is_leaf': True, }, ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200) + responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject query = FindQuery('a.b.c', startTime, endTime) @@ -103,6 +101,7 @@ def test_find_nodes(self, http_request): ('until', endTime), ], 'headers': None, + 'preload_content': False, 'timeout': 10, }) @@ -114,6 +113,26 @@ def test_find_nodes(self, http_request): self.assertIsInstance(nodes[1], LeafNode) self.assertEqual(nodes[1].path, 'a.b.c.d') + finder = RemoteFinder('https://127.0.0.1?format=msgpack') + + data = [ + { + 'path': 'a.b.c', + 'is_leaf': False, + }, + { + 'path': 'a.b.c.d', + 'is_leaf': True, + }, + ] + responseObject = HTTPResponse( + body=StringIO(msgpack.dumps(data)), + status=200, + preload_content=False, + headers={'Content-Type': 'application/x-msgpack'} + ) + http_request.return_value = responseObject + query = FindQuery('a.b.c', None, None) result = finder.find_nodes(query) @@ -123,15 +142,16 @@ def test_find_nodes(self, http_request): self.assertEqual(http_request.call_args[0], ( 'POST', - 'http://127.0.0.1/metrics/find/', + 'https://127.0.0.1/metrics/find/', )) self.assertEqual(http_request.call_args[1], { 'fields': [ ('local', '1'), - ('format', 'pickle'), + ('format', 'msgpack'), ('query', 'a.b.c'), ], 'headers': None, + 'preload_content': False, 'timeout': 10, }) @@ -144,14 +164,56 @@ def test_find_nodes(self, http_request): self.assertEqual(nodes[1].path, 'a.b.c.d') # non-pickle response - responseObject = HTTPResponse(body='error', status=200) + responseObject = HTTPResponse(body=StringIO('error'), status=200, preload_content=False) http_request.return_value = responseObject result = finder.find_nodes(query) - with self.assertRaisesRegexp(Exception, 'Error decoding find response from http://[^ ]+: .+'): + with self.assertRaisesRegexp(Exception, 'Error decoding find response from https://[^ ]+: .+'): list(result) + @patch('graphite.finders.remote.cache.get') + @patch('urllib3.PoolManager.request') + def test_find_nodes_cached(self, http_request, cache_get): + finder = RemoteFinder('127.0.0.1') + + startTime = 1496262000 + endTime = 1496262060 + + data = [ + { + 'path': 'a.b.c', + 'is_leaf': False, + }, + { + 'path': 'a.b.c.d', + 'is_leaf': True, + }, + ] + cache_get.return_value = data + + query = FindQuery('a.b.c', startTime, endTime) + result = finder.find_nodes(query) + + self.assertIsInstance(result, types.GeneratorType) + + nodes = list(result) + + self.assertEqual(http_request.call_count, 0) + + self.assertEqual(cache_get.call_count, 1) + self.assertEqual(cache_get.call_args[0], ( + 'find:127.0.0.1:553f764f7b436175c0387e22b4a19213:1496262000:1496262000', + )) + + self.assertEqual(len(nodes), 2) + + self.assertIsInstance(nodes[0], BranchNode) + self.assertEqual(nodes[0].path, 'a.b.c') + + self.assertIsInstance(nodes[1], LeafNode) + self.assertEqual(nodes[1].path, 'a.b.c.d') + # # Test RemoteFinder.fetch() # @@ -174,7 +236,7 @@ def test_RemoteFinder_fetch(self, http_request): 'name': 'a.b.c.d', }, ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200) + responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject result = finder.fetch(['a.b.c.d'], startTime, endTime) @@ -201,6 +263,7 @@ def test_RemoteFinder_fetch(self, http_request): ('target', 'a.b.c.d'), ], 'headers': None, + 'preload_content': False, 'timeout': 10, }) @@ -215,7 +278,7 @@ def test_get_index(self, http_request): 'a.b.c', 'a.b.c.d', ] - responseObject = HTTPResponse(body=StringIO(json.dumps(data)), status=200) + responseObject = HTTPResponse(body=StringIO(json.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject result = finder.get_index({}) @@ -231,6 +294,7 @@ def test_get_index(self, http_request): ('local', '1'), ], 'headers': None, + 'preload_content': False, 'timeout': 10, }) @@ -240,7 +304,7 @@ def test_get_index(self, http_request): self.assertEqual(result[1], 'a.b.c.d') # non-json response - responseObject = HTTPResponse(body='error', status=200) + responseObject = HTTPResponse(body=StringIO('error'), status=200, preload_content=False) http_request.return_value = responseObject with self.assertRaisesRegexp(Exception, 'Error decoding index response from http://[^ ]+: .+'): diff --git a/webapp/tests/test_metrics.py b/webapp/tests/test_metrics.py index ab219bf5d..73ef2319b 100644 --- a/webapp/tests/test_metrics.py +++ b/webapp/tests/test_metrics.py @@ -15,7 +15,7 @@ import whisper -from graphite.util import unpickle +from graphite.util import unpickle, msgpack class MetricsTester(TestCase): @@ -194,6 +194,37 @@ def test_find_view_basics(data): #self.assertEqual(int(data[1]['intervals'][0].start), ts_minus_sixty_seconds) self.assertEqual(int(data[1]['intervals'][0].end), ts) + # + # format=msgpack + # + request=copy.deepcopy(request_default) + request['format']='msgpack' + request['query']='*' + content = test_find_view_basics(request) + data = msgpack.loads(content) + self.assertEqual(len(data), 1) + self.assertEqual(data[0]['path'], 'hosts') + self.assertEqual(data[0]['is_leaf'], False) + + request['query']='hosts.*.cpu' + content = test_find_view_basics(request) + data = msgpack.loads(content) + self.assertEqual(len(data), 2) + + data = sorted(data, key=lambda item: item['path']) + + self.assertEqual(data[0]['path'], 'hosts.worker1.cpu') + self.assertEqual(data[0]['is_leaf'], True) + self.assertEqual(len(data[0]['intervals']), 1) + #self.assertEqual(int(data[0]['intervals'][0].start), ts_minus_sixty_seconds) + self.assertEqual(int(data[0]['intervals'][0][1]), ts) + + self.assertEqual(data[1]['path'], 'hosts.worker2.cpu') + self.assertEqual(data[1]['is_leaf'], True) + self.assertEqual(len(data[1]['intervals']), 1) + #self.assertEqual(int(data[1]['intervals'][0].start), ts_minus_sixty_seconds) + self.assertEqual(int(data[1]['intervals'][0][1]), ts) + # # format=completer # diff --git a/webapp/tests/test_readers_remote.py b/webapp/tests/test_readers_remote.py index a978d0563..be368112f 100644 --- a/webapp/tests/test_readers_remote.py +++ b/webapp/tests/test_readers_remote.py @@ -1,13 +1,12 @@ from .base import TestCase import mock -import pickle from urllib3.response import HTTPResponse -from StringIO import StringIO from graphite.finders.remote import RemoteFinder from graphite.readers.remote import RemoteReader +from graphite.util import pickle, StringIO, msgpack from graphite.wsgi import application # NOQA makes sure we have a working WSGI app @@ -36,7 +35,7 @@ def test_RemoteReader_init_repr_get_intervals(self): # Test RemoteReader.fetch_multi() # @mock.patch('urllib3.PoolManager.request') - @mock.patch('django.conf.settings.CLUSTER_SERVERS', ['127.0.0.1', '8.8.8.8']) + @mock.patch('django.conf.settings.CLUSTER_SERVERS', ['127.0.0.1', 'http://8.8.8.8/graphite?format=msgpack&local=0']) @mock.patch('django.conf.settings.INTRACLUSTER_HTTPS', False) @mock.patch('django.conf.settings.REMOTE_STORE_USE_POST', False) @mock.patch('django.conf.settings.REMOTE_FETCH_TIMEOUT', 10) @@ -65,8 +64,9 @@ def test_RemoteReader_fetch_multi(self, http_request): 'name': 'a.b.c.d' } ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200) + responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject + result = reader.fetch_multi(startTime, endTime) expected_response = [ { @@ -91,12 +91,30 @@ def test_RemoteReader_fetch_multi(self, http_request): ('target', 'a.b.c.d'), ], 'headers': None, + 'preload_content': False, 'timeout': 10, }) # bulk_query & now + finder = test_finders[1] reader = RemoteReader(finder, {'intervals': [], 'path': 'a.b.c.d'}, bulk_query=['a.b.c.d']) + data = [ + {'start': startTime, + 'step': 60, + 'end': endTime, + 'values': [1.0, 0.0, 1.0, 0.0, 1.0], + 'name': 'a.b.c.d' + } + ] + responseObject = HTTPResponse( + body=StringIO(msgpack.dumps(data)), + status=200, + preload_content=False, + headers={'Content-Type': 'application/x-msgpack'} + ) + http_request.return_value = responseObject + result = reader.fetch_multi(startTime, endTime, now=endTime, requestContext={'forwardHeaders': {'Authorization': 'Basic xxxx'}}) expected_response = [ { @@ -109,12 +127,12 @@ def test_RemoteReader_fetch_multi(self, http_request): self.assertEqual(result, expected_response) self.assertEqual(http_request.call_args[0], ( 'GET', - 'http://127.0.0.1/render/', + 'http://8.8.8.8/graphite/render/', )) self.assertEqual(http_request.call_args[1], { 'fields': [ - ('format', 'pickle'), - ('local', '1'), + ('format', 'msgpack'), + ('local', '0'), ('noCache', '1'), ('from', startTime), ('until', endTime), @@ -122,18 +140,19 @@ def test_RemoteReader_fetch_multi(self, http_request): ('now', endTime), ], 'headers': {'Authorization': 'Basic xxxx'}, + 'preload_content': False, 'timeout': 10, }) # non-pickle response - responseObject = HTTPResponse(body='error', status=200) + responseObject = HTTPResponse(body=StringIO('error'), status=200, preload_content=False) http_request.return_value = responseObject with self.assertRaisesRegexp(Exception, 'Error decoding render response from http://[^ ]+: .+'): reader.fetch(startTime, endTime) # non-200 response - responseObject = HTTPResponse(body='error', status=500) + responseObject = HTTPResponse(body=StringIO('error'), status=500, preload_content=False) http_request.return_value = responseObject with self.assertRaisesRegexp(Exception, 'Error response 500 from http://[^ ]+'): @@ -184,7 +203,7 @@ def test_RemoteReader_fetch(self, http_request): 'name': 'a.b.c.d' } ] - responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200) + responseObject = HTTPResponse(body=StringIO(pickle.dumps(data)), status=200, preload_content=False) http_request.return_value = responseObject result = reader.fetch(startTime, endTime) expected_response = ((1496262000, 1496262060, 60), [1.0, 0.0, 1.0, 0.0, 1.0]) @@ -203,5 +222,6 @@ def test_RemoteReader_fetch(self, http_request): ('target', 'a.b.c.*'), ], 'headers': None, + 'preload_content': False, 'timeout': 10, }) diff --git a/webapp/tests/test_render.py b/webapp/tests/test_render.py index a5a595ec1..13e88dd32 100644 --- a/webapp/tests/test_render.py +++ b/webapp/tests/test_render.py @@ -1,6 +1,5 @@ from datetime import datetime import copy -import json import os import time import math @@ -9,17 +8,13 @@ from mock import patch -try: - import cPickle as pickle -except ImportError: - import pickle - from graphite.render.datalib import TimeSeries from graphite.render.hashing import ConsistentHashRing, hashRequest, hashData from graphite.render.evaluator import evaluateTarget, extractPathExpressions, evaluateScalarTokens from graphite.render.functions import NormalizeEmptyResultError from graphite.render.grammar import grammar from graphite.render.views import renderViewJson +from graphite.util import pickle, msgpack, json import whisper from django.conf import settings @@ -367,6 +362,15 @@ def test_render_view(self): unpickled[0]['values'][-1] = 'NaN' self.assertEqual(unpickled, expected) + # test msgpack format + response = self.client.get(url, {'target': 'test', 'format': 'msgpack', 'from': ts-50, 'now': ts}) + self.assertEqual(response['content-type'], 'application/x-msgpack') + unpickled = msgpack.loads(response.content) + # special handling for NaN value, otherwise assertEqual fails + self.assertTrue(math.isnan(unpickled[0]['values'][-1])) + unpickled[0]['values'][-1] = 'NaN' + self.assertEqual(unpickled, expected) + # test json format response = self.client.get(url, {'target': 'test', 'format': 'json', 'from': ts-50, 'now': ts}) self.assertEqual(response['content-type'], 'application/json')