From bc54a2b3cb6440d6cba32f4a6c9f45f613883231 Mon Sep 17 00:00:00 2001 From: Luke Heberling Date: Tue, 23 Apr 2019 23:42:53 +0000 Subject: [PATCH 1/5] Use --copy-dest, enabling the rsync algorithm when copying to staging --- carbonate/sync.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/carbonate/sync.py b/carbonate/sync.py index 34bcb04..15dd329 100644 --- a/carbonate/sync.py +++ b/carbonate/sync.py @@ -12,15 +12,15 @@ from .fill import fill_archives -def sync_from_remote(sync_file, remote, staging, rsync_options): +def sync_from_remote(storage_dir, sync_file, remote, staging, rsync_options): try: try: os.makedirs(os.path.dirname(staging)) except OSError: pass - cmd = " ".join(['rsync', rsync_options, '--files-from', - sync_file.name, remote, staging + cmd = " ".join(['rsync', rsync_options, '--copy-dest', storage_dir, + '--files-from', sync_file.name, remote, staging ]) print " - Rsyncing metrics" @@ -115,7 +115,7 @@ def heal_metric(source, dest, start_time=0, end_time=None, overwrite=False, logging.warn("Failed to copy %s! %s" % (dest, e)) -def run_batch(metrics_to_sync, remote, local_storage, rsync_options, +def run_batch(metrics_to_sync, remote, storage_dir, rsync_options, remote_ip, dirty, lock_writes=False, overwrite=False): staging_dir = mkdtemp(prefix=remote_ip) sync_file = NamedTemporaryFile(delete=False) @@ -126,7 +126,7 @@ def run_batch(metrics_to_sync, remote, local_storage, rsync_options, for metric in metrics_to_sync: staging_file = "%s/%s" % (staging_dir, metric) - local_file = "%s/%s" % (local_storage, metric) + local_file = "%s/%s" % (storage_dir, metric) metrics_to_heal.append((staging_file, local_file)) sync_file.write("\n".join(metrics_to_sync)) @@ -134,7 +134,7 @@ def run_batch(metrics_to_sync, remote, local_storage, rsync_options, rsync_start = time() - sync_from_remote(sync_file, remote, staging, rsync_options) + sync_from_remote(storage_dir, sync_file, remote, staging, rsync_options) rsync_elapsed = (time() - rsync_start) From 867bfedba023ce169ce5336ebc2fb611acd68211 Mon Sep 17 00:00:00 2001 From: Luke Heberling Date: Wed, 24 Apr 2019 15:51:42 -0700 Subject: [PATCH 2/5] make --copy-dest configurable --- README.md | 8 ++++++-- carbonate/cli.py | 14 ++++++++++++-- carbonate/sync.py | 12 ++++++------ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index cd1580e..64fc557 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,8 @@ optional arguments: usage: carbon-sync [-h] [-c CONFIG_FILE] [-C CLUSTER] [-f METRICS_FILE] -s SOURCE_NODE [-d STORAGE_DIR] [-b BATCH_SIZE] [--source-storage-dir SOURCE_STORAGE_DIR] - [--rsync-options RSYNC_OPTIONS] [--dirty] [-l] [-o] + [--rsync-options RSYNC_OPTIONS] [--rsync-disable-copy-dest] + [--dirty] [-l] [-o] Sync local metrics using remote nodes in the cluster @@ -155,10 +156,13 @@ optional arguments: --rsync-options RSYNC_OPTIONS Pass option(s) to rsync. Make sure to use "--rsync- options=" if option starts with '-' (default: -azpS) + --rsync-disable-copy-dest + Avoid --copy-dest, transfer all whisper data between + nodes. (default: False) --dirty If set, don't clean temporary rsync directory (default: False) -l, --lock Lock whisper files during filling (default: False) - -o, --overwrite Write all non nullpoints from src to dst (default: + -o, --overwrite Write all non nullpoints from src to dst (default: False) ``` diff --git a/carbonate/cli.py b/carbonate/cli.py index 7ddaea2..1086010 100644 --- a/carbonate/cli.py +++ b/carbonate/cli.py @@ -172,6 +172,12 @@ def carbon_sync(): help='Pass option(s) to rsync. Make sure to use ' + '"--rsync-options=" if option starts with \'-\'') + parser.add_argument( + '--rsync-disable-copy-dest', + default=False, + action='store_true', + help='Avoid --copy-dest, transfer all whisper data between nodes.') + parser.add_argument( '--dirty', action='store_true', @@ -210,6 +216,10 @@ def carbon_sync(): total_metrics = 0 batch_size = int(args.batch_size) + rsync_options = args.rsync_options + if not args.rsync_disable_copy_dest: + rsync_options += ' --copy-dest="%s"' % args.storage_dir + for metric in metrics: total_metrics += 1 metric = metric.strip() @@ -221,7 +231,7 @@ def carbon_sync(): print "* Running batch %s-%s" \ % (total_metrics-batch_size+1, total_metrics) run_batch(metrics_to_sync, remote, - args.storage_dir, args.rsync_options, + args.storage_dir, rsync_options, remote_ip, args.dirty, lock_writes=whisper_lock_writes, overwrite=args.overwrite) metrics_to_sync = [] @@ -230,7 +240,7 @@ def carbon_sync(): print "* Running batch %s-%s" \ % (total_metrics-len(metrics_to_sync)+1, total_metrics) run_batch(metrics_to_sync, remote, - args.storage_dir, args.rsync_options, + args.storage_dir, rsync_options, remote_ip, args.dirty, lock_writes=whisper_lock_writes) elapsed = (time() - start) diff --git a/carbonate/sync.py b/carbonate/sync.py index 15dd329..34bcb04 100644 --- a/carbonate/sync.py +++ b/carbonate/sync.py @@ -12,15 +12,15 @@ from .fill import fill_archives -def sync_from_remote(storage_dir, sync_file, remote, staging, rsync_options): +def sync_from_remote(sync_file, remote, staging, rsync_options): try: try: os.makedirs(os.path.dirname(staging)) except OSError: pass - cmd = " ".join(['rsync', rsync_options, '--copy-dest', storage_dir, - '--files-from', sync_file.name, remote, staging + cmd = " ".join(['rsync', rsync_options, '--files-from', + sync_file.name, remote, staging ]) print " - Rsyncing metrics" @@ -115,7 +115,7 @@ def heal_metric(source, dest, start_time=0, end_time=None, overwrite=False, logging.warn("Failed to copy %s! %s" % (dest, e)) -def run_batch(metrics_to_sync, remote, storage_dir, rsync_options, +def run_batch(metrics_to_sync, remote, local_storage, rsync_options, remote_ip, dirty, lock_writes=False, overwrite=False): staging_dir = mkdtemp(prefix=remote_ip) sync_file = NamedTemporaryFile(delete=False) @@ -126,7 +126,7 @@ def run_batch(metrics_to_sync, remote, storage_dir, rsync_options, for metric in metrics_to_sync: staging_file = "%s/%s" % (staging_dir, metric) - local_file = "%s/%s" % (storage_dir, metric) + local_file = "%s/%s" % (local_storage, metric) metrics_to_heal.append((staging_file, local_file)) sync_file.write("\n".join(metrics_to_sync)) @@ -134,7 +134,7 @@ def run_batch(metrics_to_sync, remote, storage_dir, rsync_options, rsync_start = time() - sync_from_remote(storage_dir, sync_file, remote, staging, rsync_options) + sync_from_remote(sync_file, remote, staging, rsync_options) rsync_elapsed = (time() - rsync_start) From f938499245f5241ddf5af2c0839c2a0ad2256358 Mon Sep 17 00:00:00 2001 From: Denis Zhdanov Date: Fri, 26 Apr 2019 14:34:07 +0200 Subject: [PATCH 3/5] Fixing python version in travis --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index d0cfb24..42821f7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +python: 2.7 language: python env: - TOXENV=py27 From 076b8902dbb5906fd8fcf708e3180cb68ccb4bef Mon Sep 17 00:00:00 2001 From: Piotr Date: Tue, 30 Apr 2019 10:07:54 +0200 Subject: [PATCH 4/5] pin dependency flake<3.7, higher version requires py3 only modules --- tests/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/requirements.txt b/tests/requirements.txt index d0c735f..a31f476 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -2,7 +2,7 @@ nose mock coverage nosexcover -flake8 +flake8<3.7 mccabe==0.6.1 tox pep8 From 093829d6c2d1d101aa11412f6d8de584a9e42312 Mon Sep 17 00:00:00 2001 From: Piotr Date: Tue, 30 Apr 2019 10:46:26 +0200 Subject: [PATCH 5/5] Python 3 support --- .gitignore | 1 + .travis.yml | 1 + carbonate/aggregation.py | 2 +- carbonate/cli.py | 32 ++++++++++++++++---------------- carbonate/config.py | 5 ++++- carbonate/fill.py | 15 +++++++++++---- carbonate/sieve.py | 2 +- carbonate/sync.py | 14 +++++++------- setup.py | 38 +++++++++++++++++++++++--------------- tests/test_fill.py | 2 +- tests/test_sync.py | 2 +- tox.ini | 2 +- 12 files changed, 68 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index 6a6c4d2..6115c94 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ var/ *.egg-info/ .installed.cfg *.egg +*.eggs # Installer logs pip-log.txt diff --git a/.travis.yml b/.travis.yml index 42821f7..1293467 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: python env: - TOXENV=py27 - TOXENV=py27-pre0_9_10 + - TOXENV=py3 - TOXENV=lint install: - pip install tox diff --git a/carbonate/aggregation.py b/carbonate/aggregation.py index 2058e62..9173b65 100644 --- a/carbonate/aggregation.py +++ b/carbonate/aggregation.py @@ -28,5 +28,5 @@ def setAggregation(path, mode): try: whisper.setAggregationMethod(path, mode) return 1 - except whisper.WhisperException, exc: + except whisper.WhisperException as exc: logging.warning("%s failed (%s)" % (path, str(exc))) diff --git a/carbonate/cli.py b/carbonate/cli.py index 1086010..20ea41a 100644 --- a/carbonate/cli.py +++ b/carbonate/cli.py @@ -36,7 +36,7 @@ def carbon_hosts(): cluster_hosts = [d[0] for d in cluster.destinations] - print "\n".join(cluster_hosts) + print("\n".join(cluster_hosts)) def carbon_list(): @@ -56,7 +56,7 @@ def carbon_list(): try: for m in listMetrics(args.storage_dir, args.follow_sym_links): - print m + print(m) except IOError as e: if e.errno == errno.EPIPE: pass # we got killed, lol @@ -90,7 +90,7 @@ def carbon_lookup(): for i, _ in enumerate(results): results[i] = results[i].split(':')[0] - print "\n".join(results) + print("\n".join(results)) def carbon_sieve(): @@ -130,7 +130,7 @@ def carbon_sieve(): for metric in metrics: m = metric.strip() for match in filterMetrics([m], match_dests, cluster, invert): - print metric.strip() + print(metric.strip()) except KeyboardInterrupt: sys.exit(1) @@ -228,8 +228,8 @@ def carbon_sync(): metrics_to_sync.append(mpath) if total_metrics % batch_size == 0: - print "* Running batch %s-%s" \ - % (total_metrics-batch_size+1, total_metrics) + print("* Running batch %s-%s" + % (total_metrics-batch_size+1, total_metrics)) run_batch(metrics_to_sync, remote, args.storage_dir, rsync_options, remote_ip, args.dirty, lock_writes=whisper_lock_writes, @@ -237,19 +237,19 @@ def carbon_sync(): metrics_to_sync = [] if len(metrics_to_sync) > 0: - print "* Running batch %s-%s" \ - % (total_metrics-len(metrics_to_sync)+1, total_metrics) + print("* Running batch %s-%s" + % (total_metrics-len(metrics_to_sync)+1, total_metrics)) run_batch(metrics_to_sync, remote, args.storage_dir, rsync_options, remote_ip, args.dirty, lock_writes=whisper_lock_writes) elapsed = (time() - start) - print "" - print "* Sync Report" - print " ========================================" - print " Total metrics synced: %s" % total_metrics - print " Total time: %ss" % elapsed + print("") + print("* Sync Report") + print(" ========================================") + print(" Total metrics synced: %s" % total_metrics) + print(" Total time: %ss" % elapsed) def carbon_path(): @@ -293,7 +293,7 @@ def carbon_path(): func = partial(metric_to_fs, prepend=prepend) for metric in metrics: - print func(metric) + print(func(metric)) def carbon_stale(): @@ -347,7 +347,7 @@ def carbon_stale(): passed = (data if use_whisper else stat)(path, args.limit, args.offset) value = path if args.paths else fs_to_metric(path, prepend=prefix) if (not passed) if args.reverse else passed: - print value + print(value) def whisper_aggregate(): @@ -383,7 +383,7 @@ def whisper_aggregate(): if mode is not None: path = metric_to_fs(name, prepend=args.storage_dir) metrics_count = metrics_count + setAggregation(path, mode) - except ValueError, exc: + except ValueError as exc: logging.warning("Unable to parse '%s' (%s)" % (metric, str(exc))) logging.info('Successfully set aggregation mode for ' + diff --git a/carbonate/config.py b/carbonate/config.py index 18e062f..62a652e 100644 --- a/carbonate/config.py +++ b/carbonate/config.py @@ -1,6 +1,9 @@ import os import pwd -from ConfigParser import RawConfigParser, NoOptionError +try: + from ConfigParser import RawConfigParser, NoOptionError +except ImportError: + from configparser import RawConfigParser, NoOptionError class Config(): diff --git a/carbonate/fill.py b/carbonate/fill.py index 92b822a..bcb36bd 100644 --- a/carbonate/fill.py +++ b/carbonate/fill.py @@ -14,6 +14,7 @@ # Work performed by author while working at Booking.com. +import time import whisper try: @@ -22,8 +23,14 @@ except ImportError: HAS_OPERATOR = False -import itertools -import time +try: + # Python 2 + from future_builtins import filter + from future_builtins import zip + range = xrange +except ImportError: + # Python 3 + pass def itemgetter(*items): @@ -71,9 +78,9 @@ def fill(src, dst, tstart, tstop): (timeInfo, values) = whisper.fetch(src, fromTime, untilTime) (start, end, archive_step) = timeInfo - pointsToWrite = list(itertools.ifilter( + pointsToWrite = list(filter( lambda points: points[1] is not None, - itertools.izip(xrange(start, end, archive_step), values))) + zip(range(start, end, archive_step), values))) # order points by timestamp, newest first pointsToWrite.sort(key=lambda p: p[0], reverse=True) whisper.update_many(dst, pointsToWrite) diff --git a/carbonate/sieve.py b/carbonate/sieve.py index 25cf3a6..da467e1 100644 --- a/carbonate/sieve.py +++ b/carbonate/sieve.py @@ -5,7 +5,7 @@ def filterMetrics(inputs, node, cluster, invert=False, filter_long=False): - if isinstance(node, basestring): + if isinstance(node, str): match = [node] else: match = node diff --git a/carbonate/sync.py b/carbonate/sync.py index 34bcb04..da165c1 100644 --- a/carbonate/sync.py +++ b/carbonate/sync.py @@ -23,7 +23,7 @@ def sync_from_remote(sync_file, remote, staging, rsync_options): sync_file.name, remote, staging ]) - print " - Rsyncing metrics" + print(" - Rsyncing metrics") proc = subprocess.Popen(cmd, shell=True, @@ -53,7 +53,7 @@ def sync_batch(metrics_to_heal, lock_writes=False, overwrite=False): "Avg: %fs Time Left: %ss (%d%%)" \ % (sync_count, sync_total, sync_avg, sync_remain, sync_percent) - print status_line + print(status_line) # Do not try healing data past the point they were rsync'd # as we would not have new points in staging anyway. @@ -143,14 +143,14 @@ def run_batch(metrics_to_sync, remote, local_storage, rsync_options, total_time = rsync_elapsed + merge_elapsed - print " --------------------------------------" - print " Rsync time: %ss" % rsync_elapsed - print " Merge time: %ss" % merge_elapsed - print " Total time: %ss" % total_time + print(" --------------------------------------") + print(" Rsync time: %ss" % rsync_elapsed) + print(" Merge time: %ss" % merge_elapsed) + print(" Total time: %ss" % total_time) # Cleanup if dirty: - print " dirty mode: left temporary directory %s" % staging_dir + print(" dirty mode: left temporary directory %s" % staging_dir) else: rmtree(staging_dir) diff --git a/setup.py b/setup.py index 6e6d103..771bc35 100644 --- a/setup.py +++ b/setup.py @@ -6,35 +6,37 @@ class my_install_scripts(install_scripts): - def write_script(self, script_name, contents, mode="t", *ignored): - contents = re.sub("import sys", - "import sys\nsys.path.append('/opt/graphite/lib')", - contents) - install_scripts.write_script(self, script_name, contents, mode="t", *ignored) + def write_script(self, script_name, contents, mode="t", *ignored): + contents = re.sub("import sys", + "import sys\nsys.path.append('/opt/graphite/lib')", + contents) + install_scripts.write_script(self, script_name, contents, + mode="t", *ignored) def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() + setup( name="carbonate", version=__version__, author="Scott Sanders", author_email="scott@jssjr.com", description=("Tools for managing federated carbon clusters."), - license = "MIT", - keywords = "graphite carbon", - url = "https://github.com/jssjr/carbonate", - include_package_data = True, + license="MIT", + keywords="graphite carbon", + url="https://github.com/jssjr/carbonate", + include_package_data=True, packages=find_packages(), - long_description = read('README.md'), + long_description=read('README.md'), long_description_content_type='text/markdown', - install_requires = [ + install_requires=[ "carbon", "whisper", ], cmdclass={'install_scripts': my_install_scripts}, - entry_points = { + entry_points={ 'console_scripts': [ 'carbon-lookup = carbonate.cli:carbon_lookup', 'carbon-sync = carbonate.cli:carbon_sync', @@ -45,6 +47,12 @@ def read(fname): 'carbon-stale = carbonate.cli:carbon_stale', 'whisper-fill = carbonate.cli:whisper_fill', 'whisper-aggregate = carbonate.cli:whisper_aggregate' - ] - } - ) + ] + }, + classifiers=[ + 'License :: OSI Approved :: MIT License' + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 3', + ] +) diff --git a/tests/test_fill.py b/tests/test_fill.py index 8b9f3a9..89e69d0 100644 --- a/tests/test_fill.py +++ b/tests/test_fill.py @@ -140,7 +140,7 @@ def test_fill_endat(self, unused_mock_time): except (IOError, OSError): pass - complete = range(1, 21) + complete = list(range(1, 21)) seconds_per_point = 1 seconds_per_point_l2 = seconds_per_point * 4 points_number = len(complete) diff --git a/tests/test_sync.py b/tests/test_sync.py index 3823fc5..1473bdd 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -52,7 +52,7 @@ def test_heal_mixed_data(self): heal_metric(self.db, testdb, overwrite=True) final_data = whisper.fetch(testdb, 0) - self.assertEqual(final_data[1], range(1,21)) + self.assertEqual(final_data[1], list(range(1,21))) def test_heal_empty(self): diff --git a/tox.ini b/tox.ini index da51d2d..e376108 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py27,py27-pre0_9_10,lint +envlist = py27,py27-pre0_9_10,lint,py3 [testenv] install_command = pip install --install-option='--install-scripts={envbindir}' --install-option='--install-lib={envsitepackagesdir}' --install-option='--install-data={envdir}/lib/graphite' -r{toxinidir}/requirements.txt -r{toxinidir}/tests/requirements.txt --pre {opts} {packages}