diff --git a/webapp/graphite/finders/remote.py b/webapp/graphite/finders/remote.py index 71e894f94..85295c54c 100644 --- a/webapp/graphite/finders/remote.py +++ b/webapp/graphite/finders/remote.py @@ -12,7 +12,7 @@ from graphite.logger import log from graphite.node import LeafNode, BranchNode from graphite.render.hashing import compactHash -from graphite.util import unpickle, logtime, is_local_interface, json, msgpack +from graphite.util import unpickle, logtime, is_local_interface, json, msgpack, BufferedHTTPReader from graphite.finders.utils import BaseFinder from graphite.readers.remote import RemoteReader @@ -108,9 +108,11 @@ def find_nodes(self, query, timer=None): try: if result.getheader('content-type') == 'application/x-msgpack': - results = msgpack.load(result) + results = msgpack.load(BufferedHTTPReader( + result, buffer_size=settings.REMOTE_BUFFER_SIZE)) else: - results = unpickle.load(result) + results = unpickle.load(BufferedHTTPReader( + result, buffer_size=settings.REMOTE_BUFFER_SIZE)) except Exception as err: self.fail() log.exception( diff --git a/webapp/graphite/local_settings.py.example b/webapp/graphite/local_settings.py.example index 3d0351bf6..325f13e8d 100644 --- a/webapp/graphite/local_settings.py.example +++ b/webapp/graphite/local_settings.py.example @@ -294,6 +294,9 @@ DEFAULT_XFILES_FACTOR = 0 #REMOTE_STORE_USE_POST = False # Use POST instead of GET for remote requests +# Size of the buffer used for streaming remote cluster responses +#REMOTE_BUFFER_SIZE = 1024 * 1024 + # During a rebalance of a consistent hash cluster, after a partition event on a replication > 1 cluster, # or in other cases we might receive multiple TimeSeries data for a metric key. Merge them together rather # that choosing the "most complete" one (pre-0.9.14 behaviour). diff --git a/webapp/graphite/readers/remote.py b/webapp/graphite/readers/remote.py index 5ec2e85e9..56919b8c8 100644 --- a/webapp/graphite/readers/remote.py +++ b/webapp/graphite/readers/remote.py @@ -4,7 +4,7 @@ from graphite.logger import log from graphite.readers.utils import BaseReader -from graphite.util import unpickle, msgpack +from graphite.util import unpickle, msgpack, BufferedHTTPReader class RemoteReader(BaseReader): @@ -76,9 +76,11 @@ def fetch_multi(self, startTime, endTime, now=None, requestContext=None): try: if result.getheader('content-type') == 'application/x-msgpack': - data = msgpack.load(result) + data = msgpack.load(BufferedHTTPReader( + result, buffer_size=settings.REMOTE_BUFFER_SIZE)) else: - data = unpickle.load(result) + data = unpickle.load(BufferedHTTPReader( + result, buffer_size=settings.REMOTE_BUFFER_SIZE)) except Exception as err: self.finder.fail() log.exception( diff --git a/webapp/graphite/settings.py b/webapp/graphite/settings.py index fef6366ae..cb373856c 100644 --- a/webapp/graphite/settings.py +++ b/webapp/graphite/settings.py @@ -69,6 +69,7 @@ REMOTE_STORE_MERGE_RESULTS = True REMOTE_STORE_FORWARD_HEADERS = [] REMOTE_STORE_USE_POST = False +REMOTE_BUFFER_SIZE = 1024 * 1024 CARBON_METRIC_PREFIX='carbon' CARBONLINK_HOSTS = ["127.0.0.1:7002"] CARBONLINK_TIMEOUT = 1.0 diff --git a/webapp/graphite/util.py b/webapp/graphite/util.py index 4c980cf79..0da9ce76f 100644 --- a/webapp/graphite/util.py +++ b/webapp/graphite/util.py @@ -13,6 +13,7 @@ limitations under the License.""" import imp +import io import socket import time import sys @@ -257,3 +258,25 @@ def wrapped_f(*args, **kwargs): timer.stop() return wrapped_f + + +class BufferedHTTPReader(io.IOBase): + def __init__(self, response, buffer_size=1048576): + self.response = response + self.buffer_size = buffer_size + self.buffer = '' + self.pos = 0 + + def read(self, amt=None): + if amt is None: + return self.response.read() + if len(self.buffer) - self.pos < amt: + self.buffer = self.buffer[self.pos:] + self.pos = 0 + self.buffer += self.response.read(self.buffer_size) + data = self.buffer[self.pos:self.pos + amt] + self.pos += amt + if self.pos >= len(self.buffer): + self.pos = 0 + self.buffer = '' + return data