diff --git a/README.md b/README.md index 8d91947..f973b62 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ The following command line options are available: ## Configuration -All your configuration will need to be created under the key `goingrunning`. There are three concepts you need to know to configure the plugin: targets, trainings and flavours. They are explained in detail below. +All your configuration will need to be created under the key `goingrunning`. There are three concepts you need to know to configure the plugin: `targets`, `trainings` and `flavours`. They are explained in detail below. ### Targets @@ -88,13 +88,19 @@ goingrunning: clean_target: yes delete_from_device: - LIBRARY.DAT + generate_playlist: yes + copy_files: yes ``` -The key `device_root` indicates where your operating system mounts the device. The key `device_path` indicates the folder inside the device to which your audio files will be copied. In the above example the final destination is `/media/MPD1/MUSIC/AUTO/`. It is assumed that the folder indicated in the `device_path` key exists. If it doesn't the plugin will exit with a warning. +The key `device_root` indicates where your operating system mounts the device. The key `device_path` indicates the folder inside the device to which your audio files will be copied. In the above example the final destination is `/media/MPD1/MUSIC/AUTO/`. It is assumed that the folder indicated in the `device_path` key exists. If it doesn't the plugin will exit with a warning. The device path can also be an empty string if you want to store the files in the root folder of the device. The key `clean_target`, when set to yes, instructs the plugin to clean the `device_path` folder before copying the new songs to the device. This will remove all audio songs and playlists found in that folder. -Some devices might have library files or other data files which need to be deleted in order for the device to reindex the new songs. These files can be added to the `delete_from_device` key. The files listed here are relative to the `device_root` directive. +Some devices might have library files or other data files which need to be deleted in order for the device to re-discover the new songs. These files can be added to the `delete_from_device` key. The files listed here are relative to the `device_root` directive. + +You can also generate a playlist by setting the `generate_playlist` option to `yes`. It will create an .m3u playlist file and store it to your `device_path` location. + +There might be some special conditions in which you do not want to copy files to the device. In fact, the destination folder (`device_root`/`device_path`) might refer to an ordinary folder on your computer and you might want to create only a playlist there. In this case, you want to disable the copying of the music files by setting `copy_files: no`. By default, `copy_files` is always enabled so in the above `MPD1` target it could also be omitted and files would be copied all the same. ### Trainings @@ -123,7 +129,7 @@ goingrunning: The keys under the `query` section are exactly the same ones that you use when you are using beets for any other operation. Whatever is described in the [beets query documentation](https://beets.readthedocs.io/en/stable/reference/query.html) applies here with two restriction: you must query specific fields in the form of `field: value` and (for now) regular expressions are not supported. #### ordering -Your songs are ordered based on a scoring system. What you indicate under the `ordering` section is the fields by which the songs will be ordered and the weight each one of them will have on the final score. The weight can go from -100 to 100. Negative numbers indicate a reverse ordering. (...probably need more explanation?...) +At the time being there is only one ordering algorithm (`ScoreBasedLinearPermutation`) which orders your songs based on a scoring system. What you indicate under the `ordering` section is the fields by which the songs will be ordered. Each field will have a weight from -100 to 100 indicating how important that field is with respect to the others. Negative numbers indicate a reverse ordering. (@todo: this probably needs an example.) #### use_flavours You will find that many of the query specification that you come up with will be repeated across different trainings. To reduce repetition and at the same time to be able to combine many different recipes you can use flavours. Similarly to targets, instead of defining the queries directly on your training you can define queries in a separate section called `flavours` (see below) and then use the `use_flavours` key to indicate which flavours to use. The order in which flavours are indicated is important: the first one has the highest priority meaning that it will overwrite any keys that might be found in subsequent flavours. @@ -134,7 +140,6 @@ The duration is expressed in minutes and serves the purpose of defining the tota #### target This key indicates to which target (defined in the `targets` section) your songs will be copied to. - #### the `fallback` training You might also define a special `fallback` training: @@ -147,6 +152,9 @@ goingrunning: Any key not defined in a specific training will be looked up from the `fallback` training. So, if in the `10K` example you were to remove the `target` key, it would be looked up from the `fallback` training and your songs would be copied to the `my_other_device` target. +#### Play count and favouring unplayed songs +In the default configuration of the plugin, on the `fallback` training there are two disabled options that you might want to consider enabling: `increment_play_count` and `favour_unplayed`. They are meant to be used together. The `increment_play_count` option, on copying your songs to your device, will increment the `play_count` attribute by one and store it in your library and on your media file. The `favour_unplayed` option will instruct the algorithm that picks the songs from your selection to favour the songs that have lower `play_count`. This feature will make you discover songs in your library that you might have never heard. At the same time it ensures that the proposed songs are always changed even if you keep your selection query and your ordering unchanged. + ### Flavours The flavours section serves the purpose of defining named queries. If you have 5 different high intensity trainings different in length but sharing queries about bpm, mood and loudness, you can create a single definition here, called flavour, and reuse that flavour in your different trainings with the `use_flavours` key. @@ -164,18 +172,74 @@ goingrunning: genre: Rock metallic: genre: Metal - sunshine: + sunshine: genre: Reggae 60s: year: 1960..1969 chillout: bpm: 1..120 - mood_happy: 0.5..0.99 + mood_happy: 0.5..1 ``` This way, from the above flavours you might add `use_flavours: [overthetop, rock, 60s]` to one training and `use_flavours: [overthetop, metallic]` to another so they will share the same `overthetop` intensity definition whilst having different genre preferences. Similarly, your recovery session might use `use_flavours: [chillout, sunshine]`. +### Advanced queries +When it comes to handling queries, this plugin introduces some major differences with respect to the beets core you need to be aware of. + +#### Recurring fields extend the selections +You might define different flavours in which some of the same fields are defined, like the `genre` field in the `rocker` and the `metallic` flavours above. You can define a training that makes use of those flavours and optionally adding the same field through a direct query section, like this: + +```yaml +goingrunning: + trainings: + HM: + query: + genre: Folk + use_flavours: [rocker, metallic] +``` + +The resulting query will include songs corresponding to any of the three indicated genres: `genre='Folk' OR genre='Rock' OR genre='Metal'`. This, of course, is applicable to all fields. + + +#### Fields can be used as lists +Sometimes it is cumbersome to define a separate flavour for each additional value of a specific field. For example, it would be nice to have the above `chillout` flavour to include a list of genres instead of having to combine it with multiple flavours. Well, you can just do that by using the list notation like this: +```yaml +goingrunning: + flavours: + chillout: + bpm: 1..120 + mood_happy: 0.5..1 + genre: [Soul, Oldies, Ballad] +``` + +or like this: + +```yaml +goingrunning: + flavours: + chillout: + bpm: 1..120 + mood_happy: 0.5..1 + genre: + - Soul + - Oldies + - Ballad +``` +The resulting query will have the same effect including all indicated genres: `genre='Soul' OR genre='Oldies' OR genre='Ballad'`. This technique can be applied to all fields. + + +#### Negated fields can also be used as lists +What is described above also applies to negated fields. That is to say, you can also negate a field and use it as a list to query your library by excluding all those values: +```yaml +goingrunning: + flavours: + not_good_for_running: + ^genre: [Jazz, Psychedelic Rock, Gospel] +``` +When the above flavour is compiled it will result in a query excluding all indicated genres: `genre!='Jazz' AND genre!='Psychedelic Rock' AND genre!='Gospel'`. This technique can be applied to all fields. + + ### Using a separate configuration file In my experience the configuration section can grow quite long depending on your needs, so I find it useful to keep my `goingrunning` specific configuration in a separate file and from the main configuration file include it like this: @@ -205,10 +269,11 @@ Do the same as above but today you feel Ska: ## Issues -If something is not working as expected please use the Issue tracker. -If the documentation is not clear please use the Issue tracker. -If you have a feature request please use the Issue tracker. -In any other situation please use the Issue tracker. + +- If something is not working as expected please use the Issue tracker. +- If the documentation is not clear please use the Issue tracker. +- If you have a feature request please use the Issue tracker. +- In any other situation please use the Issue tracker. ## Roadmap diff --git a/beetsplug/goingrunning/about.py b/beetsplug/goingrunning/about.py index fdb8d51..f8ccfc3 100644 --- a/beetsplug/goingrunning/about.py +++ b/beetsplug/goingrunning/about.py @@ -1,14 +1,12 @@ # Copyright: Copyright (c) 2020., Adam Jakab -# # Author: Adam Jakab -# Created: 3/27/20, 3:52 PM # License: See LICENSE.txt __author__ = u'Adam Jakab' __email__ = u'adam@jakab.pro' __copyright__ = u'Copyright (c) 2020, {} <{}>'.format(__author__, __email__) __license__ = u'License :: OSI Approved :: MIT License' -__version__ = u'1.2.0' +__version__ = u'1.2.1' __status__ = u'Stable' __PACKAGE_TITLE__ = u'GoingRunning' diff --git a/beetsplug/goingrunning/command.py b/beetsplug/goingrunning/command.py index b2285ff..a2fa52b 100644 --- a/beetsplug/goingrunning/command.py +++ b/beetsplug/goingrunning/command.py @@ -2,21 +2,17 @@ # Author: Adam Jakab # License: See LICENSE.txt -import os -import random -import string -from glob import glob from optparse import OptionParser -from pathlib import Path -from shutil import copyfile from beets import library +from beets.dbcore import query from beets.dbcore.db import Results -from beets.dbcore.queryparse import parse_query_part -from beets.library import Library, Item, parse_query_parts +from beets.dbcore.queryparse import parse_query_part, construct_query_part +from beets.library import Library, Item from beets.ui import Subcommand, decargs -from beets.util.confit import Subview, NotFoundError +from beets.util.confit import Subview from beetsplug.goingrunning import common +from beetsplug.goingrunning import itemexport from beetsplug.goingrunning import itemorder from beetsplug.goingrunning import itempick @@ -24,7 +20,7 @@ class GoingRunningCommand(Subcommand): config: Subview = None lib: Library = None - query = None + query = [] parser: OptionParser = None cfg_quiet = False @@ -147,7 +143,7 @@ def handle_training(self): return # Verify target device path path - if not self._get_destination_path_for_training(training): + if not common.get_destination_path_for_training(training): self._say( "Invalid target!", log_only=False) return @@ -161,8 +157,6 @@ def handle_training(self): log_only=False) return - - # Check count if len(lib_items) < 1: self._say( @@ -200,174 +194,24 @@ def handle_training(self): flds += ["play_count", "artist", "title"] self.display_library_items(sel_items, flds, prefix="Selected: ") - # 5) Clea, Copy, Run - self._clean_target_path(training) - self._copy_items_to_target(training, sel_items) + # 5) Clean, Copy, Playlist, Run + itemexport.generate_output(training, sel_items, self.cfg_dry_run) self._say("Run!", log_only=False) - def _clean_target_path(self, training: Subview): - target_name = common.get_training_attribute(training, "target") - - if self._get_target_attribute_for_training(training, "clean_target"): - dst_path = self._get_destination_path_for_training(training) - - self._say("Cleaning target[{0}]: {1}". - format(target_name, dst_path), log_only=False) - song_extensions = ["mp3", "mp4", "flac", "wav", "ogg", "wma", "m3u"] - target_file_list = [] - for ext in song_extensions: - target_file_list += glob( - os.path.join(dst_path, "*.{}".format(ext))) - - for f in target_file_list: - self._say("Deleting: {}".format(f)) - if not self.cfg_dry_run: - os.remove(f) - - additional_files = self._get_target_attribute_for_training(training, - "delete_from_device") - if additional_files and len(additional_files) > 0: - root = self._get_target_attribute_for_training(training, - "device_root") - root = Path(root).expanduser() - - self._say("Deleting additional files: {0}". - format(additional_files), log_only=False) - - for path in additional_files: - path = Path(str.strip(path, "/")) - dst_path = os.path.realpath(root.joinpath(path)) - - if not os.path.isfile(dst_path): - self._say( - "The file to delete does not exist: {0}".format(path), - log_only=True) - continue - - self._say("Deleting: {}".format(dst_path)) - if not self.cfg_dry_run: - os.remove(dst_path) - - def _copy_items_to_target(self, training: Subview, rnd_items): - target_name = common.get_training_attribute(training, "target") - increment_play_count = common.get_training_attribute( - training, "increment_play_count") - dst_path = self._get_destination_path_for_training(training) - self._say("Copying to target[{0}]: {1}". - format(target_name, dst_path), log_only=False) - - def random_string(length=6): - letters = string.ascii_letters + string.digits - return ''.join(random.choice(letters) for i in range(length)) - - cnt = 0 - for item in rnd_items: - src = os.path.realpath(item.get("path").decode("utf-8")) - if not os.path.isfile(src): - # todo: this is bad enough to interrupt! create option for this - self._say("File does not exist: {}".format(src)) - continue - - fn, ext = os.path.splitext(src) - gen_filename = "{0}_{1}{2}".format(str(cnt).zfill(6), - random_string(), ext) - dst = "{0}/{1}".format(dst_path, gen_filename) - self._say("Copying[{1}]: {0}".format(src, gen_filename), - log_only=True) - - if not self.cfg_dry_run: - copyfile(src, dst) - if increment_play_count: - common.increment_play_count_on_item(item) - - cnt += 1 - - def _get_target_for_training(self, training: Subview): - target_name = common.get_training_attribute(training, "target") - self._say("Finding target: {0}".format(target_name)) - - if not self.config["targets"][target_name].exists(): - self._say( - "The target name[{0}] is not defined!".format(target_name)) - return - - return self.config["targets"][target_name] - - def _get_target_attribute_for_training(self, training: Subview, - attrib: str = "name"): - target_name = common.get_training_attribute(training, "target") - self._say("Getting attribute[{0}] for target: {1}".format(attrib, - target_name), - log_only=True) - target = self._get_target_for_training(training) - if not target: - return - - if attrib == "name": - attrib_val = target_name - elif attrib in ("device_root", "device_path", "delete_from_device"): - # these should NOT propagate up - try: - attrib_val = target[attrib].get() - except NotFoundError: - attrib_val = None - else: - attrib_val = common.get_target_attribute(target, attrib) - - self._say( - "Found target[{0}] attribute[{1}] path: {2}".format(target_name, - attrib, - attrib_val), - log_only=True) - - return attrib_val - - def _get_destination_path_for_training(self, training: Subview): - target_name = common.get_training_attribute(training, "target") - - if not target_name: - self._say( - "Training does not declare a `target`!". - format(target_name), log_only=False) - return - - root = self._get_target_attribute_for_training(training, "device_root") - path = self._get_target_attribute_for_training(training, "device_path") - path = path or "" - - if not root: - self._say( - "The target[{0}] does not declare a device root path.". - format(target_name), log_only=False) - return - - root = Path(root).expanduser() - path = Path(str.strip(path, "/")) - dst_path = os.path.realpath(root.joinpath(path)) - - if not os.path.isdir(dst_path): - self._say( - "The target[{0}] path does not exist: {1}". - format(target_name, dst_path), log_only=False) - return - - self._say( - "Found target[{0}] path: {0}".format(target_name, dst_path), - log_only=True) - - return dst_path - def _get_training_query_element_keys(self, training): + # todo: move to common answer = [] query_elements = self._gather_query_elements(training) for el in query_elements: - answer.append(el.split(":")[0]) + key = parse_query_part(el)[0] + if key not in answer: + answer.append(key) return answer def _gather_query_elements(self, training: Subview): - """Sum all query elements and order them (strongest to weakest): - command -> training -> flavours + """Sum all query elements into one big list ordered from strongest to + weakest: command -> training -> flavours """ command_query = self.query training_query = [] @@ -377,8 +221,11 @@ def _gather_query_elements(self, training: Subview): tconf = common.get_training_attribute(training, "query") if tconf: for key in tconf.keys(): - training_query.append( - common.get_query_element_string(key, tconf.get(key))) + nqe = common.get_normalized_query_element(key, tconf.get(key)) + if type(nqe) == list: + training_query.extend(nqe) + else: + training_query.append(nqe) # Append the query elements from the flavours defined on the training flavours = common.get_training_attribute(training, "use_flavours") @@ -395,26 +242,70 @@ def _gather_query_elements(self, training: Subview): self._say("Flavour query elements: {}".format(flavour_query), log_only=True) - # Remove duplicate keys (first one wins) raw_combined_query = command_query + training_query + flavour_query - combined_query = [] - used_keys = [] - for query_part in raw_combined_query: - key = parse_query_part(query_part)[0] - if key not in used_keys: - used_keys.append(key) - combined_query.append(query_part) - - self._say("Combined query elements: {}".format(combined_query), - log_only=True) - return combined_query + self._say("Combined query elements: {}". + format(raw_combined_query), log_only=True) + + return raw_combined_query + + def parse_query_elements(self, query_elements, model_cls): + registry = {} + + # Iterate through elements and group them in a registry by field name + for query_element in query_elements: + key, term, query_class, negate = parse_query_part(query_element) + if not key: + continue + # treat negated keys separately + _reg_key = "^{}".format(key) if negate else key + if _reg_key not in registry.keys(): + registry[_reg_key] = [] + registry[_reg_key].append({ + "key": key, + "term": term, + "query_class": query_class, + "negate": negate, + "q_string": query_element + }) + + def parse_and_merge_items(k, lst, cls): + parsed_items = [] + is_negated = lst[0]["negate"] + + for item in lst: + prefixes = {} + qp = construct_query_part(cls, prefixes, item["q_string"]) + parsed_items.append(qp) + + if len(parsed_items) == 1: + answer = parsed_items.pop() + else: + if is_negated: + answer = query.AndQuery(parsed_items) + else: + answer = query.OrQuery(parsed_items) + + return answer + + query_parts = [] + for key in registry.keys(): + reg_item_list = registry[key] + parsed_and_merged = parse_and_merge_items( + key, reg_item_list, model_cls) + self._say("{}: {}".format(key, parsed_and_merged)) + query_parts.append(parsed_and_merged) + + if len(query_parts) == 0: + query_parts.append(query.TrueQuery()) + + return query.AndQuery(query_parts) def _retrieve_library_items(self, training: Subview): """Returns the results of the library query for a specific training - The storing/overriding/restoring of the library.Item._types is made - necessary - by this issue: https://github.com/beetbox/beets/issues/3520 + The storing/overriding/restoring of the library.Item._types + is made necessary by this issue: + https://github.com/beetbox/beets/issues/3520 Until the issue is solved this 'hack' is necessary. """ full_query = self._gather_query_elements(training) @@ -425,7 +316,7 @@ def _retrieve_library_items(self, training: Subview): library.Item._types.update(override_types) # Execute the query parsing (using our own type overrides) - parsed_query, parsed_ordering = parse_query_parts(full_query, Item) + parsed_query = self.parse_query_elements(full_query, Item) # Restore the original types library.Item._types = original_types.copy() diff --git a/beetsplug/goingrunning/common.py b/beetsplug/goingrunning/common.py index 3805cfa..6711c6c 100644 --- a/beetsplug/goingrunning/common.py +++ b/beetsplug/goingrunning/common.py @@ -2,8 +2,11 @@ # Author: Adam Jakab # License: See LICENSE.txt import importlib +# todo: use beets logger?! +# from beets import logging import logging import os +from pathlib import Path from beets.dbcore import types from beets.library import Item @@ -48,10 +51,14 @@ '__PLUGIN_NAME__'])) -def say(msg, log_only=True, is_error=False): +def say(msg: str, log_only=True, is_error=False): + """ + https://beets.readthedocs.io/en/stable/dev/plugins.html#logging + """ _level = logging.DEBUG _level = _level if log_only else logging.INFO _level = _level if not is_error else logging.ERROR + msg = msg.replace('\'', '"') __logger__.log(level=_level, msg=msg) @@ -64,13 +71,27 @@ def get_item_attribute_type_overrides(): def get_human_readable_time(seconds): + """Formats seconds as a short human-readable HH:MM:SS string. + """ + seconds = int(seconds) m, s = divmod(seconds, 60) h, m = divmod(m, 60) return "%d:%02d:%02d" % (h, m, s) -def get_query_element_string(key, val): - return "{k}:{v}".format(k=key, v=val) +def get_normalized_query_element(key, val): + answer = "" + + tpl = "{k}:{v}" + if type(val) in [str, int, float, bool]: + answer = tpl.format(k=key, v=val) + elif type(val) == list: + answer = [] + for v in val: + answer.append(tpl.format(k=key, v=v)) + + return answer + def get_flavour_elements(flavour: Subview): elements = [] @@ -79,26 +100,108 @@ def get_flavour_elements(flavour: Subview): return elements for key in flavour.keys(): - # todo: in future flavours can have "use_flavours" key to make this recursive - elements.append(get_query_element_string(key, flavour[key].get())) + # todo: in future flavours can have "use_flavours" key to make this + # recursive + nqe = get_normalized_query_element(key, flavour[key].get()) + if type(nqe) == list: + elements.extend(nqe) + else: + elements.append(nqe) return elements + def get_training_attribute(training: Subview, attrib: str): - """Returns the attribute value from "goingrunning.trainings" for the specified training or uses the + """Returns the attribute value from "goingrunning.trainings" for the + specified training or uses the spacial fallback training configuration. """ value = None if training[attrib].exists(): value = training[attrib].get() - elif training.name != "goingrunning.trainings.fallback" and training.parent["fallback"].exists(): + elif training.name != "goingrunning.trainings.fallback" and training.parent[ + "fallback"].exists(): fallback = training.parent["fallback"] value = get_training_attribute(fallback, attrib) return value + +def get_target_for_training(training: Subview): + answer = None + + target_name = get_training_attribute(training, "target") + say("Finding target: {0}".format(target_name)) + + cfg_targets: Subview = training.parent.parent["targets"] + if not cfg_targets.exists(): + say("Cannot find 'targets' node!") + elif not cfg_targets[target_name].exists(): + say("Target name '{0}' is not defined!".format(target_name)) + else: + answer = cfg_targets[target_name] + + return answer + + +def get_target_attribute_for_training(training: Subview, + attrib: str = "name"): + answer = None + + target_name = get_training_attribute(training, "target") + say("Getting attribute[{0}] for target: {1}".format(attrib, target_name), + log_only=True) + + target = get_target_for_training(training) + if target: + if attrib == "name": + answer = target_name + else: + answer = get_target_attribute(target, attrib) + + say("Found target[{0}] attribute[{1}] path: {2}". + format(target_name, attrib, answer), log_only=True) + + return answer + + +def get_destination_path_for_training(training: Subview): + answer = None + + target_name = get_training_attribute(training, "target") + + if not target_name: + say("Training does not declare a `target`!". + format(target_name), log_only=False) + return answer + + root = get_target_attribute_for_training(training, "device_root") + path = get_target_attribute_for_training(training, "device_path") + path = path or "" + + if not root: + say("The target[{0}] does not declare a device root path.". + format(target_name), log_only=False) + return answer + + root = Path(root).expanduser() + path = Path(str.strip(path, "/")) + dst_path = os.path.realpath(root.joinpath(path)) + + if not os.path.isdir(dst_path): + say("The target[{0}] path does not exist: {1}". + format(target_name, dst_path), log_only=False) + return answer + + say("Found target[{0}] path: {0}". + format(target_name, dst_path), log_only=True) + + return dst_path + + def get_target_attribute(target: Subview, attrib: str): - """Returns the attribute value from "goingrunning.targets" for the specified target. + """Returns the attribute value from "goingrunning.targets" for the + specified target. """ value = None if target[attrib].exists(): @@ -106,6 +209,7 @@ def get_target_attribute(target: Subview, attrib: str): return value + def get_duration_of_items(items): """ Calculate the total duration of the media items using the "length" attribute @@ -115,7 +219,10 @@ def get_duration_of_items(items): if isinstance(items, list): for item in items: try: - total_time += item.get("length") + length = item.get("length") + if not length or length <= 0: + raise ValueError("Invalid length value!") + total_time += length except TypeError: pass except ValueError: @@ -162,14 +269,15 @@ def get_min_max_sum_avg_for_items(items, field_name): return _min, _max, _sum, _avg -def increment_play_count_on_item(item: Item): +def increment_play_count_on_item(item: Item, store=True, write=True): # clear_dirty is necessary to make sure that `ordering_score` and # `ordering_info` will not get stored to the library item.clear_dirty() item["play_count"] = item.get("play_count", 0) + 1 - item.store() - item.write() - + if store: + item.store() + if write: + item.write() def get_class_instance(module_name, class_name): try: diff --git a/beetsplug/goingrunning/itemexport.py b/beetsplug/goingrunning/itemexport.py new file mode 100644 index 0000000..1e6f550 --- /dev/null +++ b/beetsplug/goingrunning/itemexport.py @@ -0,0 +1,176 @@ +# Copyright: Copyright (c) 2020., Adam Jakab +# Author: Adam Jakab +# License: See LICENSE.txt + +import os +import random +import string +import tempfile +from datetime import datetime +from glob import glob +from pathlib import Path + +from alive_progress import alive_bar +from beets import util +from beets.util.confit import Subview +from beetsplug.goingrunning import common + + +def generate_output(training: Subview, items, dry_run=False): + exporter = ItemExport(training, items, dry_run) + exporter.export() + + +class ItemExport: + cfg_dry_run = False + training: Subview = None + items = [] + + def __init__(self, training, items, dry_run=False): + self.training = training + self.items = items + self.cfg_dry_run = dry_run + + def export(self): + self._clean_target() + self._copy_items() + self._generate_playist() + + def _generate_playist(self): + target_name = common.get_training_attribute(self.training, "target") + + if not common.get_target_attribute_for_training(self.training, + "generate_playlist"): + common.say("Playlist generation to target[{0}] was skipped " + "(generate_playlist=no).". + format(target_name), log_only=False) + return + + dst_path = common.get_destination_path_for_training(self.training) + training_name = str(self.training.name).split(".").pop() + playlist_filename = "{}.m3u".format(training_name) + dst = "{0}/{1}".format(dst_path, playlist_filename) + + lines = [ + "# Playlist generated for training '{}' on {}". \ + format(training_name, datetime.now()) + ] + + for item in self.items: + path = util.displayable_path(item.get("exportpath", + item.get("path"))) + if path: + path = util.syspath(path) + line = "{path}".format(path=path) + lines.append(line) + + with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as ntf: + tmp_playlist = ntf.name + for line in lines: + ntf.write("{}\n".format(line).encode("utf-8")) + + common.say("Created playlist: {0}".format(dst), log_only=True) + util.copy(tmp_playlist, dst) + util.remove(tmp_playlist) + + def _copy_items(self): + target_name = common.get_training_attribute(self.training, "target") + + # The copy_files is only False when it is explicitly declared so + copy_files = common.get_target_attribute_for_training( + self.training, "copy_files") + copy_files = False if copy_files == False else True + + if not copy_files: + common.say("Copying to target[{0}] was skipped (copy_files=no).". + format(target_name)) + return + + increment_play_count = common.get_training_attribute( + self.training, "increment_play_count") + dst_path = common.get_destination_path_for_training(self.training) + common.say("Copying to target[{0}]: {1}". + format(target_name, dst_path)) + + # todo: move to common + def random_string(length=6): + letters = string.ascii_letters + string.digits + return ''.join(random.choice(letters) for i in range(length)) + + cnt = 0 + # todo: disable alive bar when running in verbose mode + # from beets import logging as beetslogging + # beets_log = beetslogging.getLogger("beets") + # print(beets_log.getEffectiveLevel()) + + with alive_bar(len(self.items)) as bar: + for item in self.items: + src = util.displayable_path(item.get("path")) + if not os.path.isfile(src): + # todo: this is bad enough to interrupt! create option + # for this + common.say("File does not exist: {}".format(src)) + continue + + fn, ext = os.path.splitext(src) + gen_filename = "{0}_{1}{2}".format(str(cnt).zfill(6), + random_string(), ext) + dst = "{0}/{1}".format(dst_path, gen_filename) + common.say("Copying[{1}]: {0}".format(src, gen_filename)) + + if not self.cfg_dry_run: + util.copy(src, dst) + + # store the file_name for the playlist + item["exportpath"] = util.bytestring_path(gen_filename) + + if increment_play_count: + common.increment_play_count_on_item(item) + + cnt += 1 + bar() + + def _clean_target(self): + target_name = common.get_training_attribute(self.training, "target") + + if common.get_target_attribute_for_training(self.training, + "clean_target"): + dst_path = common.get_destination_path_for_training(self.training) + + common.say("Cleaning target[{0}]: {1}". + format(target_name, dst_path)) + song_extensions = ["mp3", "mp4", "flac", "wav", "ogg", "wma", "m3u"] + target_file_list = [] + for ext in song_extensions: + target_file_list += glob( + os.path.join(dst_path, "*.{}".format(ext))) + + for f in target_file_list: + common.say("Deleting: {}".format(f)) + if not self.cfg_dry_run: + os.remove(f) + + additional_files = common.get_target_attribute_for_training( + self.training, + "delete_from_device") + if additional_files and len(additional_files) > 0: + root = common.get_target_attribute_for_training(self.training, + "device_root") + root = Path(root).expanduser() + + common.say("Deleting additional files: {0}". + format(additional_files)) + + for path in additional_files: + path = Path(str.strip(path, "/")) + dst_path = os.path.realpath(root.joinpath(path)) + + if not os.path.isfile(dst_path): + common.say( + "The file to delete does not exist: {0}".format(path), + log_only=True) + continue + + common.say("Deleting: {}".format(dst_path)) + if not self.cfg_dry_run: + os.remove(dst_path) diff --git a/beetsplug/goingrunning/itemorder.py b/beetsplug/goingrunning/itemorder.py index 76e4744..a0574e7 100644 --- a/beetsplug/goingrunning/itemorder.py +++ b/beetsplug/goingrunning/itemorder.py @@ -1,7 +1,6 @@ # Copyright: Copyright (c) 2020., Adam Jakab # Author: Adam Jakab # License: See LICENSE.txt - import operator from abc import ABC from abc import abstractmethod diff --git a/beetsplug/goingrunning/itempick.py b/beetsplug/goingrunning/itempick.py index af48658..e5364ef 100644 --- a/beetsplug/goingrunning/itempick.py +++ b/beetsplug/goingrunning/itempick.py @@ -56,6 +56,7 @@ def setup(self, training: Subview, items, duration): self.training = training self.items = items self.duration = duration + self.selection = [] @abstractmethod def _make_selection(self): @@ -108,11 +109,14 @@ def _improve_selection(self): max_overtime = 10 sel_time = sum(l["length"] for l in self.selection) curr_sel = 0 - max_sel = len(self.selection) - 1 curr_run = 0 max_run = len(self.bin_boundaries) * 3 exclusions = {} + if not self.selection: + common.say("IMPROVEMENTS: SKIPPED (No initial selection)") + return + # iterate through initial selection items and try to find a better # alternative for them while sel_time < self.duration or sel_time > self.duration + \ @@ -164,7 +168,7 @@ def _improve_selection(self): format(round(time_diff - new_diff))) self.show_selection_status() - if curr_sel < max_sel: + if curr_sel < len(self.selection) - 1: curr_sel += 1 else: curr_sel = 0 @@ -185,7 +189,10 @@ def _make_initial_selection(self): if curr_run > max_run: common.say("MAX HIT!") break - low, high = self.bin_boundaries[curr_bin] + low, high = self._get_bin_boundaries(curr_bin) + if low is None or high is None: + curr_bin = curr_bin + 1 if curr_bin < max_bin else 0 + continue index = self._get_random_item_between_boundaries(low, high) if index is not None: item: Item = self.items[index] @@ -202,42 +209,20 @@ def _make_initial_selection(self): } self.selection.append(sel_data) sel_time += item_len - if curr_bin < max_bin: - curr_bin += 1 + curr_bin = curr_bin + 1 if curr_bin < max_bin else 0 common.say("{} INITIAL SELECTION: FINISHED".format("=" * 60)) self.show_selection_status() - def _setup_bin_boundaries(self): - _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( - self.items, "length") - - if not _avg: - raise ValueError("Average song length is zero!") - - num_bins = round(self.duration / _avg) - bin_size = round(len(self.items) / num_bins) - - common.say("Number of bins: {}".format(num_bins)) - common.say("Bin size: {}".format(bin_size)) - - self.bin_boundaries = [] - for bi in range(0, num_bins): - is_last_bin = bi == (num_bins - 1) - low = bi * bin_size - high = low + bin_size - 1 - if is_last_bin: - high = len(self.items) - 1 - self.bin_boundaries.append([low, high]) - - common.say("Bin boundaries: {}".format(self.bin_boundaries)) - def _get_item_within_length(self, bin_number, min_len, max_len, exclude_indices=None): + index = None if exclude_indices is None: exclude_indices = [] - index = None - low, high = self.bin_boundaries[bin_number] + + low, high = self._get_bin_boundaries(bin_number) + if low is None or high is None: + return index candidates = [] for i in range(low, high): @@ -266,6 +251,49 @@ def _get_item_within_length(self, bin_number, return index + def _get_bin_boundaries(self, bin_number): + low = None + high = None + try: + low, high = self.bin_boundaries[bin_number] + except IndexError: + pass + + return low, high + + def _setup_bin_boundaries(self): + self.bin_boundaries = [] + + if len(self.items) <= 1: + raise ValueError("There is only one song in the selection!") + + _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( + self.items, "length") + + if not _avg: + raise ValueError("Average song length is zero!") + + num_bins = round(self.duration / _avg) + bin_size = round(len(self.items) / num_bins) + + common.say("Number of bins: {}".format(num_bins)) + common.say("Bin size: {}".format(bin_size)) + + if not bin_size or bin_size * num_bins > len(self.items): + low = 0 + high = len(self.items) - 1 + self.bin_boundaries.append([low, high]) + else: + for bi in range(0, num_bins): + is_last_bin = bi == (num_bins - 1) + low = bi * bin_size + high = low + bin_size - 1 + if is_last_bin: + high = len(self.items) - 1 + self.bin_boundaries.append([low, high]) + + common.say("Bin boundaries: {}".format(self.bin_boundaries)) + def _get_random_item_between_boundaries(self, low, high): if not favour_unplayed: return randint(low, high) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5c59135..cd676f1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,13 +1,29 @@ # CHANGELOG -## v1.1.3 (in development) +## v1.2.1 (in development) ### New features: +- creating playlists on target +- possibility to disable song copy (playlist only) +- negated lists can also be used +- fields in queries can now be used as lists +- fields in different flavours now expand the selection (instead of substitution) +- maximize unheard song proposal by incrementing play_count on export ### Fixes +- multiple logging issues +## v1.2.0 + +### New features: +- introduced `play_count` handling and `favour_unplayed` based song selection + +### Fixes +- multiple lines in logging +- trainings without target + ## v1.1.2 diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index f764fd3..5a20ebb 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -17,7 +17,6 @@ goingrunning: targets: SONY-1: create_training_folder: yes - create_playlist: yes ``` @@ -42,10 +41,7 @@ goingrunning: use_flavours: [chillout, sunshine] duration: 150 ``` - -- maximize unheard song proposal(optional) by: - - incrementing listen count on export - - adding it to the query and proposing songs with lower counts + - enable song merging and exporting all songs merged into one single file (optional) - enable audio TTS files to give instructions during training: "Run for 10K at 4:45. RUN!" exporting it as mp3 files and adding it into the song list. diff --git a/setup.py b/setup.py index 8d6a207..e1ccf95 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ install_requires=[ 'beets>=1.4.9', + 'alive-progress', ], tests_require=[ diff --git a/test/config/default.yml b/test/config/default.yml index 85b890e..3652c92 100644 --- a/test/config/default.yml +++ b/test/config/default.yml @@ -40,19 +40,39 @@ goingrunning: use_flavours: [sunshine] duration: 10 target: MPD_1 - one-hour-run: - alias: "Born to run" + q-test-1: + alias: "Without nothing" + q-test-2: + alias: "Query only (different fields)" query: - bpm: 150..180 - length: 120..240 - duration: 60 - target: MPD_1 - quick-run: + bpm: 100..150 + length: 120..300 + genre: hard rock + q-test-3: + alias: "Query with one additional flavour (different fields)" query: - bpm: 150..180 - length: 120..240 - duration: 10 - target: MPD_1 + bpm: 100..150 + length: 120..300 + use_flavours: [sunshine] + q-test-4: + alias: "Query with multiple additional flavours (repeated fields)" + query: + bpm: 100..150 + year: 2015.. + use_flavours: [sunshine, 60s] + q-test-5: + alias: "Query supports fields a lists" + query: + genre: + - rock + - blues + - ska + use_flavours: [funkymonkey] + q-test-6: + alias: "Query supports negated lists" + query: + genre: [rock, blues] + ^genre: [jazz, death metal] bad-target-1: alias: "This training does not define a target" bad-target-2: @@ -66,5 +86,10 @@ goingrunning: 60s: year: 1960..1969 sunshine: - genre: Reggae + genre: reggae + funkymonkey: + genre: [funk, Rockabilly, Disco] + + + diff --git a/test/functional/000_basic_test.py b/test/functional/000_basic_test.py index 497d856..dd2055c 100644 --- a/test/functional/000_basic_test.py +++ b/test/functional/000_basic_test.py @@ -6,49 +6,61 @@ # from test.helper import ( - FunctionalTestHelper, Assertions, + FunctionalTestHelper, PLUGIN_NAME, PLUGIN_ALIAS, PLUGIN_SHORT_DESCRIPTION, get_single_line_from_output ) -class BasicTest(FunctionalTestHelper, Assertions): +class BasicTest(FunctionalTestHelper): """Test presence and invocation of the plugin. Only ensures that command does not fail. """ def test_application(self): + self.setup_beets({"config_file": b"empty.yml"}) stdout = self.run_with_output() self.assertIn(PLUGIN_NAME, stdout) self.assertIn(PLUGIN_SHORT_DESCRIPTION, stdout) def test_application_version(self): + self.setup_beets({"config_file": b"empty.yml"}) stdout = self.run_with_output("version") self.assertIn("plugins: {0}".format(PLUGIN_NAME), stdout) def test_plugin_no_arguments(self): - self.reset_beets(config_file=b"empty.yml") + self.setup_beets({"config_file": b"empty.yml"}) + stdout = self.run_with_output(PLUGIN_NAME) - self.assertIn("Usage: beet goingrunning [training] [options] [QUERY...]", stdout) + self.assertIn( + "Usage: beet goingrunning [training] [options] [QUERY...]", stdout) def test_plugin_shortname_no_arguments(self): - self.reset_beets(config_file=b"empty.yml") + self.setup_beets({"config_file": b"empty.yml"}) stdout = self.run_with_output(PLUGIN_ALIAS) self.assertIn("Usage: beet goingrunning [training] [options] [QUERY...]", stdout) def test_with_core_plugin_acousticbrainz(self): """Flexible field type declaration conflict - Introduced after release 1.1.1 when discovered core bug failing to compare flexible field types - Ref.: https://beets.readthedocs.io/en/stable/dev/plugins.html#flexible-field-types - This bug is present in beets version 1.4.9 so until the `item_types` declaration in the `GoingRunningPlugin` + Introduced after release 1.1.1 when discovered core bug failing to + compare flexible field types + Ref.: https://beets.readthedocs.io/en/stable/dev/plugins.html + #flexible-field-types + This bug is present in beets version 1.4.9 so until the `item_types` + declaration in the `GoingRunningPlugin` class is commented out this test will pass. Issue: https://github.com/adamjakab/BeetsPluginGoingRunning/issues/15 Issue(Beets): https://github.com/beetbox/beets/issues/3520 """ extra_plugin = "acousticbrainz" - self.reset_beets(config_file=b"empty.yml", extra_plugins=[extra_plugin]) + self.setup_beets({ + "config_file": b"empty.yml", + "extra_plugins": [extra_plugin] + }) + stdout = self.run_with_output("version") prefix = "plugins:" line = get_single_line_from_output(stdout, prefix) - expected = "{0} {1}".format(prefix, ", ".join([extra_plugin, PLUGIN_NAME])) + expected = "{0} {1}".format(prefix, + ", ".join([extra_plugin, PLUGIN_NAME])) self.assertEqual(expected, line) diff --git a/test/functional/001_configuration_test.py b/test/functional/001_configuration_test.py index 5b1967e..83d5041 100644 --- a/test/functional/001_configuration_test.py +++ b/test/functional/001_configuration_test.py @@ -7,15 +7,15 @@ from beets.util.confit import Subview -from test.helper import FunctionalTestHelper, Assertions, PLUGIN_NAME +from test.helper import FunctionalTestHelper, PLUGIN_NAME -class ConfigurationTest(FunctionalTestHelper, Assertions): +class ConfigurationTest(FunctionalTestHelper): """Configuration related tests """ def test_plugin_no_config(self): - self.reset_beets(config_file=b"empty.yml") + self.setup_beets({"config_file": b"empty.yml"}) self.assertTrue(self.config.exists()) self.assertTrue(self.config[PLUGIN_NAME].exists()) self.assertIsInstance(self.config[PLUGIN_NAME], Subview) @@ -24,13 +24,13 @@ def test_plugin_no_config(self): self.assertTrue(self.config[PLUGIN_NAME]["flavours"].exists()) def test_obsolete_config(self): - self.reset_beets(config_file=b"obsolete.yml") - + self.setup_beets({"config_file": b"obsolete.yml"}) logged = self.run_with_log_capture(PLUGIN_NAME) self.assertIn("INCOMPATIBLE PLUGIN CONFIGURATION", logged) self.assertIn("Offending key in training(training-1): song_bpm", logged) def test_default_config_sanity(self): + self.setup_beets({"config_file": b"default.yml"}) self.assertTrue(self.config[PLUGIN_NAME].exists()) cfg = self.config[PLUGIN_NAME] @@ -42,6 +42,7 @@ def test_default_config_sanity(self): self.assertEqual(chk_keys, cfg_keys) def test_default_config_targets(self): + self.setup_beets({"config_file": b"default.yml"}) """ Check Targets""" cfg: Subview = self.config[PLUGIN_NAME] targets = cfg["targets"] @@ -75,12 +76,14 @@ def test_default_config_targets(self): self.assertEqual("Music/", target["device_path"].get()) def test_default_config_trainings(self): + self.setup_beets({"config_file": b"default.yml"}) """ Check Targets""" cfg: Subview = self.config[PLUGIN_NAME] trainings = cfg["trainings"] self.assertTrue(trainings.exists()) def test_default_config_flavours(self): + self.setup_beets({"config_file": b"default.yml"}) """ Check Targets""" cfg: Subview = self.config[PLUGIN_NAME] flavours = cfg["flavours"] diff --git a/test/functional/002_command_test.py b/test/functional/002_command_test.py index ca49c0f..7a6cf76 100644 --- a/test/functional/002_command_test.py +++ b/test/functional/002_command_test.py @@ -5,16 +5,17 @@ # License: See LICENSE.txt # -from test.helper import FunctionalTestHelper, Assertions, PLUGIN_NAME, \ +from test.helper import FunctionalTestHelper, PLUGIN_NAME, \ PACKAGE_TITLE, PACKAGE_NAME, PLUGIN_VERSION, \ get_value_separated_from_output, convert_time_to_seconds -class CommandTest(FunctionalTestHelper, Assertions): +class CommandTest(FunctionalTestHelper): """Command related tests """ def test_plugin_version(self): + self.setup_beets({"config_file": b"default.yml"}) versioninfo = "{pt}({pn}) plugin for Beets: v{ver}".format( pt=PACKAGE_TITLE, pn=PACKAGE_NAME, @@ -28,7 +29,7 @@ def test_plugin_version(self): self.assertIn(versioninfo, logged) def test_training_listing_empty(self): - self.reset_beets(config_file=b"empty.yml") + self.setup_beets({"config_file": b"empty.yml"}) logged = self.run_with_log_capture(PLUGIN_NAME, "--list") self.assertIn("You have not created any trainings yet.", logged) @@ -36,12 +37,14 @@ def test_training_listing_empty(self): self.assertIn("You have not created any trainings yet.", logged) def test_training_listing_default(self): + self.setup_beets({"config_file": b"default.yml"}) logged = self.run_with_log_capture(PLUGIN_NAME, "--list") self.assertIn("[ training-1 ]", logged) self.assertIn("[ training-2 ]", logged) self.assertIn("[ training-3 ]", logged) def test_training_handling_inexistent(self): + self.setup_beets({"config_file": b"default.yml"}) training_name = "sitting_on_the_sofa" logged = self.run_with_log_capture(PLUGIN_NAME, training_name) self.assertIn( @@ -49,7 +52,9 @@ def test_training_handling_inexistent(self): training_name), logged) def test_training_song_count(self): + self.setup_beets({"config_file": b"default.yml"}) training_name = "training-1" + self.ensure_training_target_path(training_name) logged = self.run_with_log_capture(PLUGIN_NAME, training_name, "--count") self.assertIn("Number of songs available: {}".format(0), logged) @@ -58,7 +63,9 @@ def test_training_song_count(self): self.assertIn("Number of songs available: {}".format(0), logged) def test_training_no_songs(self): + self.setup_beets({"config_file": b"default.yml"}) training_name = "training-1" + self.ensure_training_target_path(training_name) logged = self.run_with_log_capture(PLUGIN_NAME, training_name) self.assertIn("Handling training: {0}".format(training_name), logged) self.assertIn( @@ -66,6 +73,7 @@ def test_training_no_songs(self): logged) def test_training_target_not_set(self): + self.setup_beets({"config_file": b"default.yml"}) self.add_single_item_to_library() training_name = "bad-target-1" logged = self.run_with_log_capture(PLUGIN_NAME, training_name) @@ -73,14 +81,16 @@ def test_training_target_not_set(self): "Training does not declare a `target`!", logged) def test_training_undefined_target(self): + self.setup_beets({"config_file": b"default.yml"}) self.add_single_item_to_library() training_name = "bad-target-2" logged = self.run_with_log_capture(PLUGIN_NAME, training_name) target_name = "inexistent_target" self.assertIn( - "The target name[{0}] is not defined!".format(target_name), logged) + "Target name \"{0}\" is not defined!".format(target_name), logged) def test_training_bad_target(self): + self.setup_beets({"config_file": b"default.yml"}) self.add_single_item_to_library() training_name = "bad-target-3" logged = self.run_with_log_capture(PLUGIN_NAME, training_name) @@ -94,9 +104,14 @@ def test_training_bad_target(self): def test_handling_training_1(self): """Simple query based song selection where everything matches """ + self.setup_beets({"config_file": b"default.yml"}) training_name = "training-1" - self.add_multiple_items_to_library(count=10, bpm=[120, 180], length=[120, 240]) + self.add_multiple_items_to_library( + count=10, + bpm=[120, 180], + length=[120, 240] + ) self.ensure_training_target_path(training_name) @@ -121,7 +136,6 @@ def test_handling_training_1(self): /private/var/folders/yv/9ntm56m10ql9wf_1zkbw74hr0000gp/T/tmpa2soyuro /Music Run! - """ prefix = "Handling training:" @@ -157,6 +171,7 @@ def test_handling_training_1(self): def test_handling_training_2(self): """Simple flavour based song selection with float value matching """ + self.setup_beets({"config_file": b"default.yml"}) training_name = "training-2" # Add matching items @@ -235,6 +250,7 @@ def test_handling_training_2(self): def test_handling_training_3(self): """Simple query + flavour based song selection """ + self.setup_beets({"config_file": b"default.yml"}) training_name = "training-3" # Add matching items for query + flavour diff --git a/test/helper.py b/test/helper.py index f3416aa..7e1a2a3 100644 --- a/test/helper.py +++ b/test/helper.py @@ -1,14 +1,12 @@ # Copyright: Copyright (c) 2020., Adam Jakab -# # Author: Adam Jakab -# Created: 2/18/20, 12:31 AM # License: See LICENSE.txt -# # References: https://docs.python.org/3/library/unittest.html # import os import shutil +import subprocess import sys import tempfile from contextlib import contextmanager @@ -24,7 +22,6 @@ from beets import util from beets.dbcore import types from beets.library import Item -from beets.mediafile import MediaFile from beets.util import ( syspath, bytestring_path, @@ -35,15 +32,16 @@ from beetsplug.goingrunning import common from six import StringIO -logging.getLogger('beets').propagate = True - -# Values +# Values from about.py +PACKAGE_TITLE = common.plg_ns['__PACKAGE_TITLE__'] +PACKAGE_NAME = common.plg_ns['__PACKAGE_NAME__'] PLUGIN_NAME = common.plg_ns['__PLUGIN_NAME__'] PLUGIN_ALIAS = common.plg_ns['__PLUGIN_ALIAS__'] PLUGIN_SHORT_DESCRIPTION = common.plg_ns['__PLUGIN_SHORT_DESCRIPTION__'] PLUGIN_VERSION = common.plg_ns['__version__'] -PACKAGE_NAME = common.plg_ns['__PACKAGE_NAME__'] -PACKAGE_TITLE = common.plg_ns['__PACKAGE_TITLE__'] + +_default_logger_name_ = 'beets.{plg}'.format(plg=PLUGIN_NAME) +logging.getLogger(_default_logger_name_).propagate = False class LogCapture(logging.Handler): @@ -56,11 +54,12 @@ def emit(self, record): @contextmanager -def capture_log(logger='beets'): +def capture_log(logger=_default_logger_name_): """Capture Logger output - with capture_log() as logs: - log.info(msg) - full_log = '\n'.join(logs) + >>> with capture_log() as logs: + ... log.info("Message") + + >>> full_log = ""\n"".join(logs) """ capture = LogCapture() log = logging.getLogger(logger) @@ -75,16 +74,18 @@ def capture_log(logger='beets'): def capture_stdout(): """Save stdout in a StringIO. >>> with capture_stdout() as output: - ... print('spam') + ... print('cseresznye') ... >>> output.getvalue() """ - orig = sys.stdout + org = sys.stdout sys.stdout = capture = StringIO() + if six.PY2: # StringIO encoding attr isn't writable in python >= 3 + sys.stdout.encoding = 'utf-8' try: yield sys.stdout finally: - sys.stdout = orig + sys.stdout = org print(capture.getvalue()) @@ -97,6 +98,8 @@ def control_stdin(userinput=None): """ org = sys.stdin sys.stdin = StringIO(userinput) + if six.PY2: # StringIO encoding attr isn't writable in python >= 3 + sys.stdin.encoding = 'utf-8' try: yield sys.stdin finally: @@ -143,24 +146,86 @@ def get_value_separated_from_output(fulltext: str, prefix: str): def _convert_args(args): - """Convert args to strings + """Convert args to bytestrings for Python 2 and convert them to strings + on Python 3. """ for i, elem in enumerate(args): - if isinstance(elem, bytes): - args[i] = elem.decode(util.arg_encoding()) + if six.PY2: + if isinstance(elem, six.text_type): + args[i] = elem.encode(util.arg_encoding()) + else: + if isinstance(elem, bytes): + args[i] = elem.decode(util.arg_encoding()) return args +def has_program(cmd, args=['--version']): + """Returns `True` if `cmd` can be executed. + """ + full_cmd = _convert_args([cmd] + args) + try: + with open(os.devnull, 'wb') as devnull: + subprocess.check_call(full_cmd, stderr=devnull, + stdout=devnull, stdin=devnull) + except OSError: + return False + except subprocess.CalledProcessError: + return False + else: + return True + + class Assertions(object): def assertIsFile(self: TestCase, path): self.assertTrue(os.path.isfile(syspath(path)), - msg=u'Path is not a file: {0}'.format(displayable_path(path))) + msg=u'Path is not a file: {0}'.format( + displayable_path(path))) + + +class BaseTestHelper(TestCase, Assertions): + _test_config_dir_ = os.path.join(bytestring_path( + os.path.dirname(__file__)), b'config') + _test_fixture_dir = os.path.join(bytestring_path( + os.path.dirname(__file__)), b'fixtures') -class UnitTestHelper(TestCase, Assertions): + _tempdirs = [] + _tmpdir = None + beetsdir = None __item_count = 0 + default_item_values = { + 'title': u't\u00eftle {0}', + 'artist': u'the \u00e4rtist', + 'album': u'the \u00e4lbum', + 'track': 0, + 'format': 'MP3', + } + + @classmethod + def setUpClass(cls): + pass + + @classmethod + def tearDownClass(cls): + pass + + def setUp(self): + self._tmpdir = self.create_temp_dir() + self.beetsdir = bytestring_path(self._tmpdir) + os.environ['BEETSDIR'] = self.beetsdir.decode() + pass + + def tearDown(self): + # Clean temporary folders + for tempdir in self._tempdirs: + if os.path.exists(tempdir): + shutil.rmtree(syspath(tempdir), ignore_errors=True) + self._tempdirs = [] + self._tmpdir = None + self.beetsdir = None + def create_item(self, **values): """Return an `Item` instance with sensible default values. @@ -171,29 +236,81 @@ def create_item(self, **values): prevent duplicates. """ item_count = self._get_item_count() - values_ = { - 'title': u't\u00eftle {0}', - 'artist': u'the \u00e4rtist', - 'album': u'the \u00e4lbum', - 'track': item_count, - 'format': 'MP3', - } - values_.update(values) - - values_['title'] = values_['title'].format(item_count) - item = Item(**values_) + _values = self.default_item_values + _values['title'] = _values['title'].format(item_count) + _values['track'] = item_count + _values.update(values) + item = Item(**_values) return item + def _get_item_count(self): + self.__item_count += 1 + return self.__item_count + + def create_temp_dir(self): + temp_dir = tempfile.mkdtemp() + self._tempdirs.append(temp_dir) + return temp_dir + + def _copy_files_to_beetsdir(self, file_list: list): + if file_list: + for file in file_list: + if isinstance(file, dict) and \ + 'file_name' in file and 'file_path' in file: + src = file['file_path'] + file_name = file['file_name'] + else: + src = file + file_name = os.path.basename(src) + + if isinstance(src, bytes): + src = src.decode() + + if isinstance(file_name, bytes): + file_name = file_name.decode() + + dst = os.path.join(self.beetsdir.decode(), file_name) + shutil.copyfile(src, dst) + + +class UnitTestHelper(BaseTestHelper): + config = None + + @classmethod + def setUpClass(cls): + super().setUpClass() + beets.ui._configure({"verbose": True}) + + def setUp(self): + super().setUp() + self.__item_count = 0 + + # copy configuration file to beets dir + config_file = os.path.join(self._test_config_dir_.decode(), + u'default.yml') + file_list = [{'file_name': 'config.yaml', 'file_path': config_file}] + self._copy_files_to_beetsdir(file_list) + + self.config = LazyConfig('beets', 'unit_tests') + + def tearDown(self): + """Tear down after each test + """ + self.config.clear() + super().tearDown() + def create_multiple_items(self, count=10, **values): items = [] for i in range(count): new_values = values.copy() for key in values: if type(values[key]) == list and len(values[key]) == 2: - if type(values[key][0]) == float or type(values[key][1]) == float: + if type(values[key][0]) == float or type( + values[key][1]) == float: random_val = uniform(values[key][0], values[key][1]) - elif type(values[key][0]) == int and type(values[key][1]) == int: + elif type(values[key][0]) == int and type( + values[key][1]) == int: random_val = randint(values[key][0], values[key][1]) else: raise ValueError("Elements for key({}) are neither float nor int!") @@ -203,46 +320,45 @@ def create_multiple_items(self, count=10, **values): return items - def _get_item_count(self): - self.__item_count += 1 - return self.__item_count - -class FunctionalTestHelper(TestCase, Assertions): - _test_config_dir_ = os.path.join(bytestring_path(os.path.dirname(__file__)), b'config') - _test_fixture_dir = os.path.join(bytestring_path(os.path.dirname(__file__)), b'fixtures') - _test_target_dir = bytestring_path("/tmp/beets-goingrunning-test-drive") +class FunctionalTestHelper(BaseTestHelper): __item_count = 0 + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._CFG = cls._get_default_CFG() + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + def setUp(self): - """Setup before running any tests with an empty configuration file. + """Setup before each test """ - self.reset_beets(config_file=b"default.yml") + super().setUp() def tearDown(self): - self.teardown_beets() - - def reset_beets(self, config_file: bytes, extra_plugins=None, beet_files: list = None): - self.teardown_beets() - plugins._classes = {goingrunning.GoingRunningPlugin} - if extra_plugins: - plugins.load_plugins(extra_plugins) + """Tear down after each test + """ + self._teardown_beets() + self._CFG = self._get_default_CFG() + super().tearDown() - self._setup_beets(config_file, beet_files) + def setup_beets(self, cfg=None): + if cfg is not None and type(cfg) is dict: + self._CFG.update(cfg) - def _setup_beets(self, config_file: bytes, beet_files: list = None): - self.addCleanup(self.teardown_beets) - self.beetsdir = bytestring_path(self.create_temp_dir()) - os.environ['BEETSDIR'] = self.beetsdir.decode() + plugins._classes = {self._CFG["plugin"]} + if self._CFG["extra_plugins"]: + plugins.load_plugins(self._CFG["extra_plugins"]) # copy configuration file to beets dir - config_file = os.path.join(self._test_config_dir_, config_file).decode() + config_file = os.path.join(self._test_config_dir_, + self._CFG["config_file"]).decode() file_list = [{'file_name': 'config.yaml', 'file_path': config_file}] self._copy_files_to_beetsdir(file_list) - # copy additional files to beets dir ( - self._copy_files_to_beetsdir(beet_files) - self.config = beets.config self.config.clear() self.config.read() @@ -253,54 +369,18 @@ def _setup_beets(self, config_file: bytes, beet_files: list = None): self.config['threaded'] = False self.config['import']['copy'] = False - os.makedirs(self._test_target_dir, exist_ok=True) self.config['directory'] = self.beetsdir.decode() self.lib = beets.library.Library(':memory:', self.beetsdir.decode()) - # Music target dir (MPD-1) - os.makedirs(syspath("/tmp/Music"), exist_ok=True) - # This will initialize the plugins plugins.find_plugins() - def _copy_files_to_beetsdir(self, file_list: list): - if file_list: - for file in file_list: - if isinstance(file, dict) and 'file_name' in file and 'file_path' in file: - src = file['file_path'] - file_name = file['file_name'] - else: - src = file - file_name = os.path.basename(src) - - if isinstance(src, bytes): - src = src.decode() - - if isinstance(file_name, bytes): - file_name = file_name.decode() - - dst = os.path.join(self.beetsdir.decode(), file_name) - # print("Copy({}) to beetsdir: {}".format(src, file_name)) - - shutil.copyfile(src, dst) - - def teardown_beets(self): + def _teardown_beets(self): self.unload_plugins() - # Music target dir (MPD-1) - shutil.rmtree(syspath("/tmp/Music"), ignore_errors=True) - # reset types updated here: beets/ui/__init__.py:1148 library.Item._types = {'data_source': types.STRING} - shutil.rmtree(self._test_target_dir, ignore_errors=True) - - if hasattr(self, '_tempdirs'): - for tempdir in self._tempdirs: - if os.path.exists(tempdir): - shutil.rmtree(syspath(tempdir), ignore_errors=True) - self._tempdirs = [] - if hasattr(self, 'lib'): if hasattr(self.lib, '_connections'): del self.lib._connections @@ -308,21 +388,15 @@ def teardown_beets(self): if 'BEETSDIR' in os.environ: del os.environ['BEETSDIR'] - if hasattr(self, 'config'): - self.config.clear() - - MediaFile.fields() - - # beets.config.read(user=False, defaults=True) - - def create_temp_dir(self): - temp_dir = tempfile.mkdtemp() - self._tempdirs.append(temp_dir) - return temp_dir + self.config.clear() def ensure_training_target_path(self, training_name): - # Set existing path for target - target_name = self.config[PLUGIN_NAME]["trainings"][training_name]["target"].get() + """Make sure that the path set withing the target for the training + exists by creating it under the temporary folder and changing the + device_root key in the configuration + """ + target_name = self.config[PLUGIN_NAME]["trainings"][training_name][ + "target"].get() target = self.config[PLUGIN_NAME]["targets"][target_name] device_root = self.create_temp_dir() device_path = target["device_path"].get() @@ -330,23 +404,6 @@ def ensure_training_target_path(self, training_name): full_path = os.path.join(device_root, device_path) os.makedirs(full_path) - @staticmethod - def unload_plugins(): - for plugin in plugins._classes: - plugin.listeners = None - plugins._classes = set() - plugins._instances = {} - - # def runcli(self, *args): - # # TODO mock stdin - # with capture_stdout() as out: - # try: - # ui._raw_main(_convert_args(list(args)), self.lib) - # except ui.UserError as u: - # # TODO remove this and handle exceptions in tests - # print(u.args[0]) - # return out.getvalue() - def run_command(self, *args, **kwargs): """Run a beets command with an arbitrary amount of arguments. The Library` defaults to `self.lib`, but can be overridden with @@ -369,21 +426,6 @@ def run_with_log_capture(self, *args): self.run_command(*args) return util.text_string("\n".join(out)) - def lib_path(self, path): - return os.path.join(self.beetsdir, - path.replace(b'/', bytestring_path(os.sep))) - - @staticmethod - def _dump_config(cfg: Subview): - # print(json.dumps(cfg.get(), indent=4, sort_keys=False)) - flat = cfg.flatten() - print(yaml.dump(flat, Dumper=Dumper, default_flow_style=None, indent=2, - width=1000)) - - def get_fixture_item_path(self, ext): - assert (ext in 'mp3 m4a ogg'.split()) - return os.path.join(self._test_fixture_dir, bytestring_path('song.' + ext.lower())) - def _get_item_count(self): """Internal counter for create_item """ @@ -391,33 +433,12 @@ def _get_item_count(self): return self.__item_count def create_item(self, **values): - """Return an `Item` instance with sensible default values. - - The item receives its attributes from `**values` paratmeter. The - `title`, `artist`, `album`, `track`, `format` and `path` - attributes have defaults if they are not given as parameters. - The `title` attribute is formated with a running item count to - prevent duplicates. The default for the `path` attribute - respects the `format` value. - - The item is attached to the database from `self.lib`. + """... The item is attached to the database from `self.lib`. """ - item_count = self._get_item_count() - values_ = { - 'title': u't\u00eftle {0}', - 'artist': u'the \u00e4rtist', - 'album': u'the \u00e4lbum', - 'track': item_count, - 'format': 'MP3', - } - values_.update(values) - values_['title'] = values_['title'].format(item_count) - + values['db'] = self.lib + item = super().create_item(**values) # print("Creating Item: {}".format(values_)) - values_['db'] = self.lib - item = Item(**values_) - if 'path' not in values: item['path'] = 'test/fixtures/song.' + item['format'].lower() @@ -428,10 +449,10 @@ def create_item(self, **values): def add_single_item_to_library(self, **values): item = self.create_item(**values) - # item = Item.from_path(self.get_fixture_item_path(values.pop('format'))) item.add(self.lib) + item.store() + # item.move(MoveOperation.COPY) - # item.write() return item def add_multiple_items_to_library(self, count=10, **values): @@ -439,12 +460,36 @@ def add_multiple_items_to_library(self, count=10, **values): new_values = values.copy() for key in values: if type(values[key]) == list and len(values[key]) == 2: - if type(values[key][0]) == float or type(values[key][1]) == float: + if type(values[key][0]) == float or type( + values[key][1]) == float: random_val = uniform(values[key][0], values[key][1]) - elif type(values[key][0]) == int and type(values[key][1]) == int: + elif type(values[key][0]) == int and type( + values[key][1]) == int: random_val = randint(values[key][0], values[key][1]) else: - raise ValueError("Elements for key({}) are neither float nor int!") + raise ValueError( + "Elements for key({}) are neither float nor int!") new_values[key] = random_val self.add_single_item_to_library(**new_values) + + @staticmethod + def _dump_config(cfg: Subview): + flat = cfg.flatten() + print(yaml.dump(flat, Dumper=Dumper, default_flow_style=None, + indent=2, width=1000)) + + @staticmethod + def unload_plugins(): + for plugin in plugins._classes: + plugin.listeners = None + plugins._classes = set() + plugins._instances = {} + + @staticmethod + def _get_default_CFG(): + return { + 'plugin': goingrunning.GoingRunningPlugin, + 'config_file': b'default.yml', + 'extra_plugins': [], + } diff --git a/test/tmp/helper.py b/test/tmp/helper.py new file mode 100644 index 0000000..782f02f --- /dev/null +++ b/test/tmp/helper.py @@ -0,0 +1,619 @@ +# -*- coding: utf-8 -*- +# This file is part of beets. +# Copyright 2016, Thomas Scholtes. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + +"""This module includes various helpers that provide fixtures, capture +information or mock the environment. + +- The `control_stdin` and `capture_stdout` context managers allow one to + interact with the user interface. + +- `has_program` checks the presence of a command on the system. + +- The `generate_album_info` and `generate_track_info` functions return + fixtures to be used when mocking the autotagger. + +- The `ImportSessionFixture` allows one to run importer code while + controlling the interactions through code. + +- The `TestHelper` class encapsulates various fixtures that can be set up. +""" + +from __future__ import division, absolute_import, print_function + +import os +import os.path +import shutil +import subprocess +import sys +from contextlib import contextmanager +from enum import Enum +from tempfile import mkdtemp, mkstemp + +import beets +import beets.plugins +import six +from beets import config +from beets import importer +from beets import logging +from beets import util +from beets.autotag.hooks import AlbumInfo, TrackInfo +from beets.library import Library, Item, Album +from beets.util import MoveOperation +from mediafile import MediaFile, Image +from six import StringIO + +# TODO Move AutotagMock here +from test import _common + + +class LogCapture(logging.Handler): + + def __init__(self): + logging.Handler.__init__(self) + self.messages = [] + + def emit(self, record): + self.messages.append(six.text_type(record.msg)) + + +@contextmanager +def capture_log(logger='beets'): + capture = LogCapture() + log = logging.getLogger(logger) + log.addHandler(capture) + try: + yield capture.messages + finally: + log.removeHandler(capture) + + +@contextmanager +def control_stdin(input=None): + """Sends ``input`` to stdin. + + >>> with control_stdin('yes'): + ... input() + 'yes' + """ + org = sys.stdin + sys.stdin = StringIO(input) + if six.PY2: # StringIO encoding attr isn't writable in python >= 3 + sys.stdin.encoding = 'utf-8' + try: + yield sys.stdin + finally: + sys.stdin = org + + +@contextmanager +def capture_stdout(): + """Save stdout in a StringIO. + + >>> with capture_stdout() as output: + ... print('spam') + ... + >>> output.getvalue() + 'spam' + """ + org = sys.stdout + sys.stdout = capture = StringIO() + if six.PY2: # StringIO encoding attr isn't writable in python >= 3 + sys.stdout.encoding = 'utf-8' + try: + yield sys.stdout + finally: + sys.stdout = org + print(capture.getvalue()) + + +def _convert_args(args): + """Convert args to bytestrings for Python 2 and convert them to strings + on Python 3. + """ + for i, elem in enumerate(args): + if six.PY2: + if isinstance(elem, six.text_type): + args[i] = elem.encode(util.arg_encoding()) + else: + if isinstance(elem, bytes): + args[i] = elem.decode(util.arg_encoding()) + + return args + + +def has_program(cmd, args=['--version']): + """Returns `True` if `cmd` can be executed. + """ + full_cmd = _convert_args([cmd] + args) + try: + with open(os.devnull, 'wb') as devnull: + subprocess.check_call(full_cmd, stderr=devnull, + stdout=devnull, stdin=devnull) + except OSError: + return False + except subprocess.CalledProcessError: + return False + else: + return True + + +class TestHelper(object): + """Helper mixin for high-level cli and plugin tests. + + This mixin provides methods to isolate beets' global state provide + fixtures. + """ + + # TODO automate teardown through hook registration + + def setup_beets(self, disk=False): + """Setup pristine global configuration and library for testing. + + Sets ``beets.config`` so we can safely use any functionality + that uses the global configuration. All paths used are + contained in a temporary directory + + Sets the following properties on itself. + + - ``temp_dir`` Path to a temporary directory containing all + files specific to beets + + - ``libdir`` Path to a subfolder of ``temp_dir``, containing the + library's media files. Same as ``config['directory']``. + + - ``config`` The global configuration used by beets. + + - ``lib`` Library instance created with the settings from + ``config``. + + Make sure you call ``teardown_beets()`` afterwards. + """ + self.create_temp_dir() + os.environ['BEETSDIR'] = util.py3_path(self.temp_dir) + + self.config = beets.config + self.config.clear() + self.config.read() + + self.config['plugins'] = [] + self.config['verbose'] = 1 + self.config['ui']['color'] = False + self.config['threaded'] = False + + self.libdir = os.path.join(self.temp_dir, b'libdir') + os.mkdir(self.libdir) + self.config['directory'] = util.py3_path(self.libdir) + + if disk: + dbpath = util.bytestring_path( + self.config['library'].as_filename() + ) + else: + dbpath = ':memory:' + self.lib = Library(dbpath, self.libdir) + + def teardown_beets(self): + self.lib._close() + if 'BEETSDIR' in os.environ: + del os.environ['BEETSDIR'] + self.remove_temp_dir() + self.config.clear() + beets.config.read(user=False, defaults=True) + + def load_plugins(self, *plugins): + """Load and initialize plugins by names. + + Similar setting a list of plugins in the configuration. Make + sure you call ``unload_plugins()`` afterwards. + """ + # FIXME this should eventually be handled by a plugin manager + beets.config['plugins'] = plugins + beets.plugins.load_plugins(plugins) + beets.plugins.find_plugins() + + # Take a backup of the original _types and _queries to restore + # when unloading. + Item._original_types = dict(Item._types) + Album._original_types = dict(Album._types) + Item._types.update(beets.plugins.types(Item)) + Album._types.update(beets.plugins.types(Album)) + + Item._original_queries = dict(Item._queries) + Album._original_queries = dict(Album._queries) + Item._queries.update(beets.plugins.named_queries(Item)) + Album._queries.update(beets.plugins.named_queries(Album)) + + def unload_plugins(self): + """Unload all plugins and remove the from the configuration. + """ + # FIXME this should eventually be handled by a plugin manager + beets.config['plugins'] = [] + beets.plugins._classes = set() + beets.plugins._instances = {} + Item._types = Item._original_types + Album._types = Album._original_types + Item._queries = Item._original_queries + Album._queries = Album._original_queries + + def create_importer(self, item_count=1, album_count=1): + """Create files to import and return corresponding session. + + Copies the specified number of files to a subdirectory of + `self.temp_dir` and creates a `ImportSessionFixture` for this path. + """ + import_dir = os.path.join(self.temp_dir, b'import') + if not os.path.isdir(import_dir): + os.mkdir(import_dir) + + album_no = 0 + while album_count: + album = util.bytestring_path(u'album {0}'.format(album_no)) + album_dir = os.path.join(import_dir, album) + if os.path.exists(album_dir): + album_no += 1 + continue + os.mkdir(album_dir) + album_count -= 1 + + track_no = 0 + album_item_count = item_count + while album_item_count: + title = u'track {0}'.format(track_no) + src = os.path.join(_common.RSRC, b'full.mp3') + title_file = util.bytestring_path('{0}.mp3'.format(title)) + dest = os.path.join(album_dir, title_file) + if os.path.exists(dest): + track_no += 1 + continue + album_item_count -= 1 + shutil.copy(src, dest) + mediafile = MediaFile(dest) + mediafile.update({ + 'artist': 'artist', + 'albumartist': 'album artist', + 'title': title, + 'album': album, + 'mb_albumid': None, + 'mb_trackid': None, + }) + mediafile.save() + + config['import']['quiet'] = True + config['import']['autotag'] = False + config['import']['resume'] = False + + return ImportSessionFixture(self.lib, loghandler=None, query=None, + paths=[import_dir]) + + # Library fixtures methods + + def create_item(self, **values): + """Return an `Item` instance with sensible default values. + + The item receives its attributes from `**values` paratmeter. The + `title`, `artist`, `album`, `track`, `format` and `path` + attributes have defaults if they are not given as parameters. + The `title` attribute is formated with a running item count to + prevent duplicates. The default for the `path` attribute + respects the `format` value. + + The item is attached to the database from `self.lib`. + """ + item_count = self._get_item_count() + values_ = { + 'title': u't\u00eftle {0}', + 'artist': u'the \u00e4rtist', + 'album': u'the \u00e4lbum', + 'track': item_count, + 'format': 'MP3', + } + values_.update(values) + values_['title'] = values_['title'].format(item_count) + values_['db'] = self.lib + item = Item(**values_) + if 'path' not in values: + item['path'] = 'audio.' + item['format'].lower() + # mtime needs to be set last since other assignments reset it. + item.mtime = 12345 + return item + + def add_item(self, **values): + """Add an item to the library and return it. + + Creates the item by passing the parameters to `create_item()`. + + If `path` is not set in `values` it is set to `item.destination()`. + """ + # When specifying a path, store it normalized (as beets does + # ordinarily). + if 'path' in values: + values['path'] = util.normpath(values['path']) + + item = self.create_item(**values) + item.add(self.lib) + + # Ensure every item has a path. + if 'path' not in values: + item['path'] = item.destination() + item.store() + + return item + + def add_item_fixture(self, **values): + """Add an item with an actual audio file to the library. + """ + item = self.create_item(**values) + extension = item['format'].lower() + item['path'] = os.path.join(_common.RSRC, + util.bytestring_path('min.' + extension)) + item.add(self.lib) + item.move(operation=MoveOperation.COPY) + item.store() + return item + + def add_album(self, **values): + item = self.add_item(**values) + return self.lib.add_album([item]) + + def add_item_fixtures(self, ext='mp3', count=1): + """Add a number of items with files to the database. + """ + # TODO base this on `add_item()` + items = [] + path = os.path.join(_common.RSRC, util.bytestring_path('full.' + ext)) + for i in range(count): + item = Item.from_path(path) + item.album = u'\u00e4lbum {0}'.format(i) # Check unicode paths + item.title = u't\u00eftle {0}'.format(i) + # mtime needs to be set last since other assignments reset it. + item.mtime = 12345 + item.add(self.lib) + item.move(operation=MoveOperation.COPY) + item.store() + items.append(item) + return items + + def add_album_fixture(self, track_count=1, ext='mp3'): + """Add an album with files to the database. + """ + items = [] + path = os.path.join(_common.RSRC, util.bytestring_path('full.' + ext)) + for i in range(track_count): + item = Item.from_path(path) + item.album = u'\u00e4lbum' # Check unicode paths + item.title = u't\u00eftle {0}'.format(i) + # mtime needs to be set last since other assignments reset it. + item.mtime = 12345 + item.add(self.lib) + item.move(operation=MoveOperation.COPY) + item.store() + items.append(item) + return self.lib.add_album(items) + + def create_mediafile_fixture(self, ext='mp3', images=[]): + """Copies a fixture mediafile with the extension to a temporary + location and returns the path. + + It keeps track of the created locations and will delete the with + `remove_mediafile_fixtures()` + + `images` is a subset of 'png', 'jpg', and 'tiff'. For each + specified extension a cover art image is added to the media + file. + """ + src = os.path.join(_common.RSRC, util.bytestring_path('full.' + ext)) + handle, path = mkstemp() + os.close(handle) + shutil.copyfile(src, path) + + if images: + mediafile = MediaFile(path) + imgs = [] + for img_ext in images: + file = util.bytestring_path('image-2x3.{0}'.format(img_ext)) + img_path = os.path.join(_common.RSRC, file) + with open(img_path, 'rb') as f: + imgs.append(Image(f.read())) + mediafile.images = imgs + mediafile.save() + + if not hasattr(self, '_mediafile_fixtures'): + self._mediafile_fixtures = [] + self._mediafile_fixtures.append(path) + + return path + + def remove_mediafile_fixtures(self): + if hasattr(self, '_mediafile_fixtures'): + for path in self._mediafile_fixtures: + os.remove(path) + + def _get_item_count(self): + if not hasattr(self, '__item_count'): + count = 0 + self.__item_count = count + 1 + return count + + # Running beets commands + + def run_command(self, *args, **kwargs): + """Run a beets command with an arbitrary amount of arguments. The + Library` defaults to `self.lib`, but can be overridden with + the keyword argument `lib`. + """ + sys.argv = ['beet'] # avoid leakage from test suite args + lib = None + if hasattr(self, 'lib'): + lib = self.lib + lib = kwargs.get('lib', lib) + beets.ui._raw_main(_convert_args(list(args)), lib) + + def run_with_output(self, *args): + with capture_stdout() as out: + self.run_command(*args) + return util.text_string(out.getvalue()) + + # Safe file operations + + def create_temp_dir(self): + """Create a temporary directory and assign it into + `self.temp_dir`. Call `remove_temp_dir` later to delete it. + """ + temp_dir = mkdtemp() + self.temp_dir = util.bytestring_path(temp_dir) + + def remove_temp_dir(self): + """Delete the temporary directory created by `create_temp_dir`. + """ + shutil.rmtree(self.temp_dir) + + def touch(self, path, dir=None, content=''): + """Create a file at `path` with given content. + + If `dir` is given, it is prepended to `path`. After that, if the + path is relative, it is resolved with respect to + `self.temp_dir`. + """ + if dir: + path = os.path.join(dir, path) + + if not os.path.isabs(path): + path = os.path.join(self.temp_dir, path) + + parent = os.path.dirname(path) + if not os.path.isdir(parent): + os.makedirs(util.syspath(parent)) + + with open(util.syspath(path), 'a+') as f: + f.write(content) + return path + + +class ImportSessionFixture(importer.ImportSession): + """ImportSession that can be controlled programaticaly. + + >>> lib = Library(':memory:') + >>> importer = ImportSessionFixture(lib, paths=['/path/to/import']) + >>> importer.add_choice(importer.action.SKIP) + >>> importer.add_choice(importer.action.ASIS) + >>> importer.default_choice = importer.action.APPLY + >>> importer.run() + + This imports ``/path/to/import`` into `lib`. It skips the first + album and imports thesecond one with metadata from the tags. For the + remaining albums, the metadata from the autotagger will be applied. + """ + + def __init__(self, *args, **kwargs): + super(ImportSessionFixture, self).__init__(*args, **kwargs) + self._choices = [] + self._resolutions = [] + + default_choice = importer.action.APPLY + + def add_choice(self, choice): + self._choices.append(choice) + + def clear_choices(self): + self._choices = [] + + def choose_match(self, task): + try: + choice = self._choices.pop(0) + except IndexError: + choice = self.default_choice + + if choice == importer.action.APPLY: + return task.candidates[0] + elif isinstance(choice, int): + return task.candidates[choice - 1] + else: + return choice + + choose_item = choose_match + + Resolution = Enum('Resolution', 'REMOVE SKIP KEEPBOTH MERGE') + + default_resolution = 'REMOVE' + + def add_resolution(self, resolution): + assert isinstance(resolution, self.Resolution) + self._resolutions.append(resolution) + + def resolve_duplicate(self, task, found_duplicates): + try: + res = self._resolutions.pop(0) + except IndexError: + res = self.default_resolution + + if res == self.Resolution.SKIP: + task.set_choice(importer.action.SKIP) + elif res == self.Resolution.REMOVE: + task.should_remove_duplicates = True + elif res == self.Resolution.MERGE: + task.should_merge_duplicates = True + + +def generate_album_info(album_id, track_values): + """Return `AlbumInfo` populated with mock data. + + Sets the album info's `album_id` field is set to the corresponding + argument. For each pair (`id`, `values`) in `track_values` the `TrackInfo` + from `generate_track_info` is added to the album info's `tracks` field. + Most other fields of the album and track info are set to "album + info" and "track info", respectively. + """ + tracks = [generate_track_info(id, values) for id, values in track_values] + album = AlbumInfo( + album_id=u'album info', + album=u'album info', + artist=u'album info', + artist_id=u'album info', + tracks=tracks, + ) + for field in ALBUM_INFO_FIELDS: + setattr(album, field, u'album info') + + return album + + +ALBUM_INFO_FIELDS = ['album', 'album_id', 'artist', 'artist_id', + 'asin', 'albumtype', 'va', 'label', + 'artist_sort', 'releasegroup_id', 'catalognum', + 'language', 'country', 'albumstatus', 'media', + 'albumdisambig', 'releasegroupdisambig', 'artist_credit', + 'data_source', 'data_url'] + + +def generate_track_info(track_id='track info', values={}): + """Return `TrackInfo` populated with mock data. + + The `track_id` field is set to the corresponding argument. All other + string fields are set to "track info". + """ + track = TrackInfo( + title=u'track info', + track_id=track_id, + ) + for field in TRACK_INFO_FIELDS: + setattr(track, field, u'track info') + for field, value in values.items(): + setattr(track, field, value) + return track + + +TRACK_INFO_FIELDS = ['artist', 'artist_id', 'artist_sort', + 'disctitle', 'artist_credit', 'data_source', + 'data_url'] diff --git a/test/unit/000_common_test.py b/test/unit/000_common_test.py deleted file mode 100644 index bd0c386..0000000 --- a/test/unit/000_common_test.py +++ /dev/null @@ -1,186 +0,0 @@ -# Copyright: Copyright (c) 2020., Adam Jakab -# -# Author: Adam Jakab -# Created: 3/17/20, 3:28 PM -# License: See LICENSE.txt -# - -from logging import Logger - -from beetsplug.goingrunning import common - -from test.helper import UnitTestHelper, Assertions, get_plugin_configuration, \ - capture_log - - -class CommonTest(UnitTestHelper, Assertions): - """Test methods in the beetsplug.goingrunning.common module - """ - - def test_module_values(self): - self.assertTrue(hasattr(common, "MUST_HAVE_TRAINING_KEYS")) - self.assertTrue(hasattr(common, "MUST_HAVE_TARGET_KEYS")) - self.assertTrue(hasattr(common, "KNOWN_NUMERIC_FLEX_ATTRIBUTES")) - self.assertTrue(hasattr(common, "KNOWN_TEXTUAL_FLEX_ATTRIBUTES")) - self.assertTrue(hasattr(common, "__logger__")) - self.assertIsInstance(common.__logger__, Logger) - - def test_say(self): - test_message = "one two three" - - with capture_log() as logs: - common.say(test_message) - self.assertIn(test_message, '\n'.join(logs)) - - def test_get_beets_global_config(self): - self.assertEqual("0:00:00", common.get_human_readable_time(0), "Bad time format!") - self.assertEqual("0:00:33", common.get_human_readable_time(33), "Bad time format!") - self.assertEqual("0:33:33", common.get_human_readable_time(2013), "Bad time format!") - self.assertEqual("3:33:33", common.get_human_readable_time(12813), "Bad time format!") - - def test_get_flavour_elements(self): - cfg = { - "flavours": { - "speedy": { - "bpm": "180..", - "genre": "Hard Rock", - } - } - } - config = get_plugin_configuration(cfg) - self.assertListEqual(["bpm:180..", "genre:Hard Rock"], - common.get_flavour_elements( - config["flavours"]["speedy"])) - self.assertListEqual([], common.get_flavour_elements( - config["flavours"]["not_there"])) - - def test_get_training_attribute(self): - cfg = { - "trainings": { - "fallback": { - "query": { - "bpm": "120..", - }, - "target": "MPD1", - }, - "10K": { - "query": { - "bpm": "180..", - "length": "60..240", - }, - "use_flavours": ["f1", "f2"], - } - } - } - config = get_plugin_configuration(cfg) - training = config["trainings"]["10K"] - - # Direct - self.assertEqual(cfg["trainings"]["10K"]["query"], common.get_training_attribute(training, "query")) - self.assertEqual(cfg["trainings"]["10K"]["use_flavours"], - common.get_training_attribute(training, "use_flavours")) - - # Fallback - self.assertEqual(cfg["trainings"]["fallback"]["target"], common.get_training_attribute(training, "target")) - - # Inexistent - self.assertEqual(None, common.get_training_attribute(training, "hoppa")) - - def test_get_target_attribute(self): - cfg = { - "targets": { - "MPD1": { - "device_root": "/media/mpd1/", - "device_path": "auto/", - } - } - } - config = get_plugin_configuration(cfg) - target = config["targets"]["MPD1"] - - self.assertEqual("/media/mpd1/", common.get_target_attribute(target, "device_root")) - self.assertEqual("auto/", common.get_target_attribute(target, "device_path")) - self.assertEqual(None, common.get_target_attribute(target, "not_there")) - - def test_get_duration_of_items(self): - items = [self.create_item(length=120), self.create_item(length=79)] - self.assertEqual(199, common.get_duration_of_items(items)) - - # ValueError - baditem = self.create_item(length="") - self.assertEqual(0, common.get_duration_of_items([baditem])) - - # TypeError - baditem = self.create_item(length={}) - self.assertEqual(0, common.get_duration_of_items([baditem])) - - def test_get_min_max_sum_avg_for_items(self): - item1 = self.create_item(mood_happy=100) - item2 = self.create_item(mood_happy=150) - item3 = self.create_item(mood_happy=200) - _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items([item1, item2, item3], "mood_happy") - self.assertEqual(100, _min) - self.assertEqual(200, _max) - self.assertEqual(450, _sum) - self.assertEqual(150, _avg) - - item1 = self.create_item(mood_happy=99.7512345) - item2 = self.create_item(mood_happy=150.482234) - item3 = self.create_item(mood_happy=200.254733) - _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items([item1, item2, item3], "mood_happy") - self.assertEqual(99.751, _min) - self.assertEqual(200.255, _max) - self.assertEqual(450.488, _sum) - self.assertEqual(150.163, _avg) - - # ValueError - item1 = self.create_item(mood_happy=100) - item2 = self.create_item(mood_happy="") - _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items([item1, item2], "mood_happy") - self.assertEqual(100, _min) - self.assertEqual(100, _max) - self.assertEqual(100, _sum) - self.assertEqual(100, _avg) - - # TypeError - item1 = self.create_item(mood_happy=100) - item2 = self.create_item(mood_happy={}) - _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( - [item1, item2], "mood_happy") - self.assertEqual(100, _min) - self.assertEqual(100, _max) - self.assertEqual(100, _sum) - self.assertEqual(100, _avg) - - # def test_score_library_items(self): - # cfg = { - # "trainings": { - # "10K": { - # "ordering": { - # "bpm": 100, - # } - # } - # } - # } - # config = get_plugin_configuration(cfg) - # training = config["trainings"]["10K"] - # - # items = [ - # self.create_item(id=1, bpm=140), - # self.create_item(id=2, bpm=120), - # self.create_item(id=3, bpm=100), - # ] - # common.score_library_items(training, items) - # - # _id = 0 - # for item in items: - # # print(item.evaluate_template("$id - $bpm - $ordering_score")) - # _id += 1 - # self.assertEqual(_id, item.get("id")) - # self.assertTrue(hasattr(item, "ordering_score")) - # self.assertIsInstance(item.get("ordering_score"), float) - # self.assertTrue(hasattr(item, "ordering_info")) - # self.assertIsInstance(item.get("ordering_info"), dict) - # - # scores = [item.get("ordering_score") for item in items] - # self.assertEqual([100, 50, 0], scores) diff --git a/test/unit/000_init_test.py b/test/unit/000_init_test.py new file mode 100644 index 0000000..be1fa31 --- /dev/null +++ b/test/unit/000_init_test.py @@ -0,0 +1,28 @@ +# Copyright: Copyright (c) 2020., Adam Jakab +# Author: Adam Jakab +# License: See LICENSE.txt + +from beets.dbcore import types +from beetsplug.goingrunning import GoingRunningPlugin +from beetsplug.goingrunning.command import GoingRunningCommand + +from test.helper import UnitTestHelper, PLUGIN_NAME + + +class PluginTest(UnitTestHelper): + """Test methods in the beetsplug.goingrunning module + """ + + def test_plugin(self): + plg = GoingRunningPlugin() + self.assertEqual(PLUGIN_NAME, plg.name) + + def test_plugin_commands(self): + plg = GoingRunningPlugin() + GRC = plg.commands()[0] + self.assertIsInstance(GRC, GoingRunningCommand) + + def test_plugin_types_definitions(self): + plg = GoingRunningPlugin() + definitions = {'play_count': types.INTEGER} + self.assertDictEqual(definitions, plg.item_types) diff --git a/test/unit/001_about_test.py b/test/unit/001_about_test.py new file mode 100644 index 0000000..1c52f20 --- /dev/null +++ b/test/unit/001_about_test.py @@ -0,0 +1,84 @@ +# Copyright: Copyright (c) 2020., Adam Jakab +# Author: Adam Jakab +# License: See LICENSE.txt + +from beetsplug.goingrunning import about + +from test.helper import UnitTestHelper, PACKAGE_TITLE, PACKAGE_NAME, \ + PLUGIN_NAME, PLUGIN_ALIAS, PLUGIN_SHORT_DESCRIPTION, PLUGIN_VERSION + + +class AboutTest(UnitTestHelper): + """Test values defined in the beetsplug.goingrunning.about module + """ + + def test_author(self): + attr = "__author__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + + def test_email(self): + attr = "__email__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + + def test_copyright(self): + attr = "__copyright__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + + def test_license(self): + attr = "__license__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + + def test_version(self): + attr = "__version__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + self.assertEqual(PLUGIN_VERSION, getattr(about, attr)) + + def test_status(self): + attr = "__status__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + + def test_package_title(self): + attr = "__PACKAGE_TITLE__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + self.assertEqual(PACKAGE_TITLE, getattr(about, attr)) + + def test_package_name(self): + attr = "__PACKAGE_NAME__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + self.assertEqual(PACKAGE_NAME, getattr(about, attr)) + + def test_package_description(self): + attr = "__PACKAGE_DESCRIPTION__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + + def test_package_url(self): + attr = "__PACKAGE_URL__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + + def test_plugin_name(self): + attr = "__PLUGIN_NAME__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + self.assertEqual(PLUGIN_NAME, getattr(about, attr)) + + def test_plugin_alias(self): + attr = "__PLUGIN_ALIAS__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + self.assertEqual(PLUGIN_ALIAS, getattr(about, attr)) + + def test_plugin_short_description(self): + attr = "__PLUGIN_SHORT_DESCRIPTION__" + self.assertTrue(hasattr(about, attr)) + self.assertIsNotNone(getattr(about, attr)) + self.assertEqual(PLUGIN_SHORT_DESCRIPTION, getattr(about, attr)) diff --git a/test/unit/001_command_test.py b/test/unit/001_command_test.py deleted file mode 100644 index e2635d8..0000000 --- a/test/unit/001_command_test.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright: Copyright (c) 2020., Adam Jakab -# -# Author: Adam Jakab -# Created: 3/17/20, 7:09 PM -# License: See LICENSE.txt -# -# -# Author: Adam Jakab -# Created: 3/17/20, 3:28 PM -# License: See LICENSE.txt -# - -from beetsplug.goingrunning import command - -from test.helper import UnitTestHelper, Assertions, get_plugin_configuration, \ - PLUGIN_NAME, PLUGIN_ALIAS, PLUGIN_SHORT_DESCRIPTION - - -class CommandTest(UnitTestHelper, Assertions): - """Test methods in the beetsplug.goingrunning.command module - """ - - def test_module_values(self): - self.assertEqual(u'goingrunning', PLUGIN_NAME) - self.assertEqual(u'run', PLUGIN_ALIAS) - self.assertEqual( - u'run with the music that matches your training sessions', - PLUGIN_SHORT_DESCRIPTION) - - def test_class_init_config(self): - cfg = {"something": "good"} - config = get_plugin_configuration(cfg) - inst = command.GoingRunningCommand(config) - self.assertEqual(config, inst.config) diff --git a/test/unit/002_common_test.py b/test/unit/002_common_test.py new file mode 100644 index 0000000..c76ad03 --- /dev/null +++ b/test/unit/002_common_test.py @@ -0,0 +1,438 @@ +# Copyright: Copyright (c) 2020., Adam Jakab +# +# Author: Adam Jakab +# Created: 3/17/20, 3:28 PM +# License: See LICENSE.txt +# +import os +from logging import Logger + +from beets import util +from beets.dbcore import types +from beetsplug.goingrunning import common, GoingRunningPlugin + +from test.helper import UnitTestHelper, get_plugin_configuration, \ + capture_log + + +class CommonTest(UnitTestHelper): + """Test methods in the beetsplug.goingrunning.common module + """ + + def test_module_values(self): + self.assertTrue(hasattr(common, "MUST_HAVE_TRAINING_KEYS")) + self.assertTrue(hasattr(common, "MUST_HAVE_TARGET_KEYS")) + self.assertTrue(hasattr(common, "KNOWN_NUMERIC_FLEX_ATTRIBUTES")) + self.assertTrue(hasattr(common, "KNOWN_TEXTUAL_FLEX_ATTRIBUTES")) + self.assertTrue(hasattr(common, "__logger__")) + self.assertIsInstance(common.__logger__, Logger) + + def test_say(self): + test_message = "one two three" + + with capture_log() as logs: + common.say(test_message) + self.assertIn(test_message, '\n'.join(logs)) + + def test_get_item_attribute_type_overrides(self): + res = common.get_item_attribute_type_overrides() + self.assertListEqual(common.KNOWN_NUMERIC_FLEX_ATTRIBUTES, + list(res.keys())) + + exp_types = [types.Float for n in + range(0, len(common.KNOWN_NUMERIC_FLEX_ATTRIBUTES))] + res_types = [type(v) for v in res.values()] + self.assertListEqual(exp_types, res_types) + + def test_get_human_readable_time(self): + self.assertEqual("0:00:00", common.get_human_readable_time(0), + "Bad time format!") + self.assertEqual("0:00:33", common.get_human_readable_time(33), + "Bad time format!") + self.assertEqual("0:33:33", common.get_human_readable_time(2013), + "Bad time format!") + self.assertEqual("3:33:33", common.get_human_readable_time(12813), + "Bad time format!") + + def test_get_normalized_query_element(self): + # Test simple value pair(string) + key = "genre" + val = "Rock" + expected = "genre:Rock" + qe = common.get_normalized_query_element(key, val) + self.assertEqual(expected, qe) + + # Test simple value pair(int) + key = "length" + val = 360 + expected = "length:360" + qe = common.get_normalized_query_element(key, val) + self.assertEqual(expected, qe) + + # Test list of values: ['bpm:100..120', 'bpm:160..180'] + key = "bpm" + val = ["100..120", "160..180"] + expected = ["bpm:100..120", "bpm:160..180"] + qe = common.get_normalized_query_element(key, val) + self.assertListEqual(expected, qe) + + def test_get_flavour_elements(self): + cfg = { + "flavours": { + "speedy": { + "bpm": "180..", + "genre": "Hard Rock", + }, + "complex": { + "bpm": "180..", + "genre": ["Rock", "Jazz", "Pop"], + } + } + } + config = get_plugin_configuration(cfg) + + # non-existent flavour + el = common.get_flavour_elements(config["flavours"]["not_there"]) + self.assertListEqual([], el) + + # simple single values + expected = ["bpm:180..", "genre:Hard Rock"] + el = common.get_flavour_elements(config["flavours"]["speedy"]) + self.assertListEqual(expected, el) + + # list in field + expected = ["bpm:180..", "genre:Rock", "genre:Jazz", "genre:Pop"] + el = common.get_flavour_elements(config["flavours"]["complex"]) + self.assertListEqual(expected, el) + + def test_get_training_attribute(self): + cfg = { + "trainings": { + "fallback": { + "query": { + "bpm": "120..", + }, + "target": "MPD1", + }, + "10K": { + "query": { + "bpm": "180..", + "length": "60..240", + }, + "use_flavours": ["f1", "f2"], + } + } + } + config = get_plugin_configuration(cfg) + training = config["trainings"]["10K"] + + # Direct + self.assertEqual(cfg["trainings"]["10K"]["query"], + common.get_training_attribute(training, "query")) + self.assertEqual(cfg["trainings"]["10K"]["use_flavours"], + common.get_training_attribute(training, + "use_flavours")) + + # Fallback + self.assertEqual(cfg["trainings"]["fallback"]["target"], + common.get_training_attribute(training, "target")) + + # Inexistent + self.assertIsNone(common.get_training_attribute(training, "hoppa")) + + def test_get_target_for_training(self): + cfg = { + "targets": { + "MPD1": { + "device_root": "/mnt/mpd1" + } + }, + "trainings": { + "T1": { + "target": "missing", + }, + "T2": { + "target": "MPD1", + } + } + } + config = get_plugin_configuration(cfg) + + # No "targets" node + no_targets_cfg = cfg.copy() + del no_targets_cfg["targets"] + no_targets_config = get_plugin_configuration(no_targets_cfg) + training = no_targets_config["trainings"]["T1"] + self.assertIsNone(common.get_target_for_training(training)) + + # Undefined target + training = config["trainings"]["T1"] + self.assertIsNone(common.get_target_for_training(training)) + + # Target found + training = config["trainings"]["T2"] + expected = config["targets"]["MPD1"].flatten() + target = common.get_target_for_training(training).flatten() + self.assertDictEqual(expected, target) + + def test_get_target_attribute_for_training(self): + cfg = { + "targets": { + "MPD1": { + "device_root": "/mnt/mpd1" + } + }, + "trainings": { + "T1": { + "target": "missing", + }, + "T2": { + "target": "MPD1", + } + } + } + config = get_plugin_configuration(cfg) + + # Undefined target + training = config["trainings"]["T1"] + self.assertIsNone(common.get_target_attribute_for_training(training)) + + # Get name (default param) + training = config["trainings"]["T2"] + expected = "MPD1" + self.assertEqual(expected, + common.get_target_attribute_for_training(training)) + + # Get name (using param) + training = config["trainings"]["T2"] + expected = "MPD1" + self.assertEqual(expected, + common.get_target_attribute_for_training(training, + "name")) + + # Get name (using param) + training = config["trainings"]["T2"] + expected = "/mnt/mpd1" + self.assertEqual(expected, + common.get_target_attribute_for_training(training, + "device_root")) + + def test_get_destination_path_for_training(self): + tmpdir = self.create_temp_dir() + tmpdir_slashed = "{}/".format(tmpdir) + temp_sub_dir = os.path.join(tmpdir, "music") + os.mkdir(temp_sub_dir) + + cfg = { + "targets": { + "MPD-no-device-root": { + "alias": "I have no device_root", + "device_path": "music" + }, + "MPD-non-existent": { + "device_root": "/this/does/not/exist/i/hope", + "device_path": "music" + }, + "MPD1": { + "device_root": tmpdir, + "device_path": "music" + }, + "MPD2": { + "device_root": tmpdir_slashed, + "device_path": "music" + }, + "MPD3": { + "device_root": tmpdir, + "device_path": "/music" + }, + "MPD4": { + "device_root": tmpdir_slashed, + "device_path": "/music" + }, + "MPD5": { + "device_root": tmpdir_slashed, + "device_path": "/music/" + }, + }, + "trainings": { + "T0-no-target": { + "alias": "I have no target", + }, + "T0-no-device-root": { + "target": "MPD-no-device-root", + }, + "T0-non-existent": { + "target": "MPD-non-existent", + }, + "T1": { + "target": "MPD1", + }, + "T2": { + "target": "MPD2", + }, + "T3": { + "target": "MPD3", + }, + "T4": { + "target": "MPD4", + }, + "T5": { + "target": "MPD5", + } + } + } + config = get_plugin_configuration(cfg) + + # No target + training = config["trainings"]["T0-no-target"] + path = common.get_destination_path_for_training(training) + self.assertIsNone(path) + + # No device_root in target + training = config["trainings"]["T0-no-device-root"] + path = common.get_destination_path_for_training(training) + self.assertIsNone(path) + + # No non existent device_root in target + training = config["trainings"]["T0-non-existent"] + path = common.get_destination_path_for_training(training) + self.assertIsNone(path) + + # No separators between root and path + training = config["trainings"]["T1"] + expected = os.path.realpath(util.normpath( + os.path.join(tmpdir, "music")).decode()) + path = common.get_destination_path_for_training(training) + self.assertEqual(expected, path) + + # final slash on device_root + training = config["trainings"]["T2"] + expected = os.path.realpath(util.normpath( + os.path.join(tmpdir, "music")).decode()) + path = common.get_destination_path_for_training(training) + self.assertEqual(expected, path) + + # leading slash on device path + training = config["trainings"]["T3"] + expected = os.path.realpath(util.normpath( + os.path.join(tmpdir, "music")).decode()) + path = common.get_destination_path_for_training(training) + self.assertEqual(expected, path) + + # final slash on device_root and leading slash on device path + training = config["trainings"]["T4"] + expected = os.path.realpath(util.normpath( + os.path.join(tmpdir, "music")).decode()) + path = common.get_destination_path_for_training(training) + self.assertEqual(expected, path) + + # slashes allover + training = config["trainings"]["T5"] + expected = os.path.realpath(util.normpath( + os.path.join(tmpdir, "music")).decode()) + path = common.get_destination_path_for_training(training) + self.assertEqual(expected, path) + + def test_get_target_attribute(self): + cfg = { + "targets": { + "MPD1": { + "device_root": "/media/mpd1/", + "device_path": "auto/", + } + } + } + config = get_plugin_configuration(cfg) + target = config["targets"]["MPD1"] + + self.assertEqual("/media/mpd1/", + common.get_target_attribute(target, "device_root")) + self.assertEqual("auto/", + common.get_target_attribute(target, "device_path")) + self.assertEqual(None, common.get_target_attribute(target, "not_there")) + + def test_get_duration_of_items(self): + items = [self.create_item(length=120), self.create_item(length=79)] + self.assertEqual(199, common.get_duration_of_items(items)) + + # ValueError + baditem = self.create_item(length=-1) + self.assertEqual(0, common.get_duration_of_items([baditem])) + + # ValueError + baditem = self.create_item(length=None) + self.assertEqual(0, common.get_duration_of_items([baditem])) + + # TypeError + baditem = self.create_item(length=object()) + self.assertEqual(0, common.get_duration_of_items([baditem])) + + def test_get_min_max_sum_avg_for_items(self): + item1 = self.create_item(mood_happy=100) + item2 = self.create_item(mood_happy=150) + item3 = self.create_item(mood_happy=200) + _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( + [item1, item2, item3], "mood_happy") + self.assertEqual(100, _min) + self.assertEqual(200, _max) + self.assertEqual(450, _sum) + self.assertEqual(150, _avg) + + item1 = self.create_item(mood_happy=99.7512345) + item2 = self.create_item(mood_happy=150.482234) + item3 = self.create_item(mood_happy=200.254733) + _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( + [item1, item2, item3], "mood_happy") + self.assertEqual(99.751, _min) + self.assertEqual(200.255, _max) + self.assertEqual(450.488, _sum) + self.assertEqual(150.163, _avg) + + # ValueError + item1 = self.create_item(mood_happy=100) + item2 = self.create_item(mood_happy="") + _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( + [item1, item2], "mood_happy") + self.assertEqual(100, _min) + self.assertEqual(100, _max) + self.assertEqual(100, _sum) + self.assertEqual(100, _avg) + + # TypeError + item1 = self.create_item(mood_happy=100) + item2 = self.create_item(mood_happy={}) + _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( + [item1, item2], "mood_happy") + self.assertEqual(100, _min) + self.assertEqual(100, _max) + self.assertEqual(100, _sum) + self.assertEqual(100, _avg) + + # empty list + _min, _max, _sum, _avg = common.get_min_max_sum_avg_for_items( + [], "mood_happy") + self.assertEqual(0, _min) + self.assertEqual(0, _max) + self.assertEqual(0, _sum) + self.assertEqual(0, _avg) + + def test_increment_play_count_on_item(self): + item1 = self.create_item(play_count=3) + common.increment_play_count_on_item(item1, store=False, write=False) + expected = 4 + self.assertEqual(expected, item1.get("play_count")) + + def test_get_class_instance(self): + module_name = 'beetsplug.goingrunning' + class_name = 'GoingRunningPlugin' + instance = common.get_class_instance(module_name, class_name) + self.assertIsInstance(instance, GoingRunningPlugin) + + with self.assertRaises(RuntimeError): + module_name = 'beetsplug.goingtosleep' + common.get_class_instance(module_name, class_name) + + with self.assertRaises(RuntimeError): + module_name = 'beetsplug.goingrunning' + class_name = 'GoingToSleepPlugin' + common.get_class_instance(module_name, class_name) diff --git a/test/unit/003_command_test.py b/test/unit/003_command_test.py new file mode 100644 index 0000000..b761d28 --- /dev/null +++ b/test/unit/003_command_test.py @@ -0,0 +1,173 @@ +# Copyright: Copyright (c) 2020., Adam Jakab +# Author: Adam Jakab +# License: See LICENSE.txt +from beets.library import Item +from beets.util.confit import Subview +from beetsplug.goingrunning import GoingRunningCommand +from beetsplug.goingrunning import command + +from test.helper import UnitTestHelper, get_plugin_configuration, \ + PLUGIN_NAME, PLUGIN_ALIAS, PLUGIN_SHORT_DESCRIPTION + + +class CommandTest(UnitTestHelper): + """Test methods in the beetsplug.goingrunning.command module + """ + + def test_module_values(self): + self.assertEqual(u'goingrunning', PLUGIN_NAME) + self.assertEqual(u'run', PLUGIN_ALIAS) + self.assertEqual( + u'run with the music that matches your training sessions', + PLUGIN_SHORT_DESCRIPTION) + + def test_class_init_config(self): + cfg = {"something": "good"} + config = get_plugin_configuration(cfg) + inst = command.GoingRunningCommand(config) + self.assertEqual(config, inst.config) + + def test_gather_parse_query_elements__test_1(self): + plg_cfg: Subview = self.config["goingrunning"] + training: Subview = plg_cfg["trainings"]["q-test-1"] + cmd = GoingRunningCommand(plg_cfg) + elements = cmd._gather_query_elements(training) + self.assertListEqual([], elements) + + query = cmd.parse_query_elements(elements, Item) + expected = "AndQuery([TrueQuery()])" + self.assertEqual(expected, str(query)) + + def test_gather_parse_query_elements__test_2(self): + plg_cfg: Subview = self.config["goingrunning"] + training: Subview = plg_cfg["trainings"]["q-test-2"] + cmd = GoingRunningCommand(plg_cfg) + elements = cmd._gather_query_elements(training) + expected = ['bpm:100..150', 'length:120..300', 'genre:hard rock'] + self.assertListEqual(expected, elements) + + query = cmd.parse_query_elements(elements, Item) + expected = ( + "AndQuery([" + "NumericQuery('bpm', '100..150', True), " + "DurationQuery('length', '120..300', True), " + "SubstringQuery('genre', 'hard rock', True)" + "])" + ) + self.assertEqual(expected, str(query)) + + def test_gather_parse_query_elements__test_3(self): + plg_cfg: Subview = self.config["goingrunning"] + training: Subview = plg_cfg["trainings"]["q-test-3"] + cmd = GoingRunningCommand(plg_cfg) + elements = cmd._gather_query_elements(training) + expected = ['bpm:100..150', 'length:120..300', 'genre:reggae'] + self.assertListEqual(expected, elements) + + query = cmd.parse_query_elements(elements, Item) + expected = ( + "AndQuery([" + "NumericQuery('bpm', '100..150', True), " + "DurationQuery('length', '120..300', True), " + "SubstringQuery('genre', 'reggae', True)" + "])" + ) + self.assertEqual(expected, str(query)) + + def test_gather_parse_query_elements__test_4(self): + plg_cfg: Subview = self.config["goingrunning"] + training: Subview = plg_cfg["trainings"]["q-test-4"] + cmd = GoingRunningCommand(plg_cfg) + elements = cmd._gather_query_elements(training) + expected = ['bpm:100..150', 'year:2015..', + 'genre:reggae', 'year:1960..1969'] + self.assertListEqual(expected, elements) + + query = cmd.parse_query_elements(elements, Item) + expected = ( + "AndQuery([" + "NumericQuery('bpm', '100..150', True), " + "OrQuery([" + "NumericQuery('year', '2015..', True), " + "NumericQuery('year', '1960..1969', True)" + "]), " + "SubstringQuery('genre', 'reggae', True)" + "])" + ) + self.assertEqual(expected, str(query)) + + def test_gather_parse_query_elements__test_4_bis(self): + """Command line query should always be the first in the list + """ + plg_cfg: Subview = self.config["goingrunning"] + training: Subview = plg_cfg["trainings"]["q-test-4"] + cmd = GoingRunningCommand(plg_cfg) + cmd.query = ['albumartist:various artists'] + elements = cmd._gather_query_elements(training) + expected = cmd.query + \ + ['bpm:100..150', 'year:2015..', + 'genre:reggae', 'year:1960..1969'] + self.assertListEqual(expected, elements) + + query = cmd.parse_query_elements(elements, Item) + expected = ( + "AndQuery([" + "SubstringQuery('albumartist', 'various artists', True), " + "NumericQuery('bpm', '100..150', True), " + "OrQuery([" + "NumericQuery('year', '2015..', True), " + "NumericQuery('year', '1960..1969', True)" + "]), " + "SubstringQuery('genre', 'reggae', True)" + "])" + ) + self.assertEqual(expected, str(query)) + + def test_gather_parse_query_elements__test_5(self): + plg_cfg: Subview = self.config["goingrunning"] + training: Subview = plg_cfg["trainings"]["q-test-5"] + cmd = GoingRunningCommand(plg_cfg) + elements = cmd._gather_query_elements(training) + expected = ['genre:rock', 'genre:blues', 'genre:ska', + 'genre:funk', 'genre:Rockabilly', 'genre:Disco'] + self.assertListEqual(expected, elements) + + query = cmd.parse_query_elements(elements, Item) + expected = ( + "AndQuery([" + "OrQuery([" + "SubstringQuery('genre', 'rock', True), " + "SubstringQuery('genre', 'blues', True), " + "SubstringQuery('genre', 'ska', True), " + "SubstringQuery('genre', 'funk', True), " + "SubstringQuery('genre', 'Rockabilly', True), " + "SubstringQuery('genre', 'Disco', True)" + "])" + "])" + ) + self.assertEqual(expected, str(query)) + + def test_gather_parse_query_elements__test_6(self): + plg_cfg: Subview = self.config["goingrunning"] + training: Subview = plg_cfg["trainings"]["q-test-6"] + cmd = GoingRunningCommand(plg_cfg) + elements = cmd._gather_query_elements(training) + expected = ['genre:rock', 'genre:blues', + '^genre:jazz', '^genre:death metal'] + self.assertListEqual(expected, elements) + + query = cmd.parse_query_elements(elements, Item) + print(query) + expected = ( + "AndQuery([" + "OrQuery([" + "SubstringQuery('genre', 'rock', True), " + "SubstringQuery('genre', 'blues', True)" + "]), " + "AndQuery([" + "NotQuery(SubstringQuery('genre', 'jazz', True)), " + "NotQuery(SubstringQuery('genre', 'death metal', True))" + "])" + "])" + ) + self.assertEqual(expected, str(query))