Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
werwolfby committed Oct 27, 2015
2 parents 6af1818 + eb1e23a commit ddca577
Show file tree
Hide file tree
Showing 92 changed files with 17,006 additions and 1,950 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ branch = false
omit =
*/virtualenv/*
*/tests/*
*/tests_functional/*
*/utils/bittorrent.py
source = monitorrent/

Expand Down
19 changes: 18 additions & 1 deletion monitorrent/db.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from sqlalchemy import create_engine, event, Column, String, Integer, Table, MetaData
from sqlalchemy import create_engine, event, Column, String, Integer, Table, types
import sqlalchemy.orm
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
from alembic.migration import MigrationContext
from alembic.operations import Operations
from datetime import datetime
import pytz


class ContextSession(sqlalchemy.orm.Session):
Expand All @@ -24,6 +26,21 @@ def __exit__(self, exc_type, exc_val, exc_tb):
finally:
self.close()


class UTCDateTime(types.TypeDecorator):

impl = types.DateTime

def process_bind_param(self, value, engine):
if value is not None:
return value.astimezone(pytz.utc)

def process_result_value(self, value, engine):
if value is not None:
return datetime(value.year, value.month, value.day,
value.hour, value.minute, value.second,
value.microsecond, tzinfo=pytz.utc)

Base = declarative_base()
_DBSession = None
engine = None
Expand Down
148 changes: 139 additions & 9 deletions monitorrent/engine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytz
import threading
from datetime import datetime
from sqlalchemy import Column, Integer, DateTime
from monitorrent.db import Base, DBSession
from sqlalchemy import Column, Integer, ForeignKey, Unicode, Enum, func
from monitorrent.db import Base, DBSession, row2dict, UTCDateTime


class Logger(object):
Expand Down Expand Up @@ -71,12 +72,141 @@ def remove_torrent(self, torrent_hash):
return self.clients_manager.remove_torrent(torrent_hash)


class Execute(Base):
class ExecuteSettings(Base):
__tablename__ = "settings_execute"

id = Column(Integer, primary_key=True)
interval = Column(Integer, nullable=False)
last_execute = Column(DateTime, nullable=True)
last_execute = Column(UTCDateTime, nullable=True)


class Execute(Base):
__tablename__ = 'execute'

id = Column(Integer, primary_key=True)
start_time = Column(UTCDateTime, nullable=False)
finish_time = Column(UTCDateTime, nullable=False)
status = Column(Enum('finished', 'failed'), nullable=False)
failed_message = Column(Unicode, nullable=True)


class ExecuteLog(Base):
__tablename__ = 'execute_log'

id = Column(Integer, primary_key=True)
execute_id = Column(ForeignKey('execute.id'))
time = Column(UTCDateTime, nullable=False)
message = Column(Unicode, nullable=False)
level = Column(Enum('info', 'warning', 'failed', 'downloaded'), nullable=False)


class DbLoggerWrapper(Logger):
def __init__(self, logger, log_manager):
"""
:type logger: Logger | None
:type log_manager: ExecuteLogManager
"""
self._log_manager = log_manager
self._logger = logger

def started(self):
self._log_manager.started()
if self._logger:
self._logger.started()

def finished(self, finish_time, exception):
self._log_manager.finished(finish_time, exception)
if self._logger:
self._logger.finished(finish_time, exception)

def info(self, message):
self._log_manager.log_entry(message, 'info')
if self._logger:
self._logger.info(message)

def failed(self, message):
self._log_manager.log_entry(message, 'failed')
if self._logger:
self._logger.failed(message)

def downloaded(self, message, torrent):
self._log_manager.log_entry(message, 'downloaded')
if self._logger:
self._logger.downloaded(message, torrent)


# noinspection PyMethodMayBeStatic
class ExecuteLogManager(object):
_execute_id = None

def started(self):
if self._execute_id is not None:
raise Exception('Execute already in progress')

with DBSession() as db:
# noinspection PyArgumentList
start_time = datetime.now(pytz.utc)
# default values for not finished execute is failed and finish_time equal to start_time
execute = Execute(start_time=start_time, finish_time=start_time, status='failed')
db.add(execute)
db.commit()
self._execute_id = execute.id

def finished(self, finish_time, exception):
if self._execute_id is None:
raise Exception('Execute is not started')

with DBSession() as db:
# noinspection PyArgumentList
execute = db.query(Execute).filter(Execute.id == self._execute_id).first()
execute.status = 'finished' if exception is None else 'failed'
execute.finish_time = finish_time
if exception is not None:
execute.failed_message = unicode(exception)
self._execute_id = None

def log_entry(self, message, level):
if self._execute_id is None:
raise Exception('Execute is not started')

with DBSession() as db:
execute_log = ExecuteLog(execute_id=self._execute_id, time=datetime.now(pytz.utc),
message=message, level=level)
db.add(execute_log)

def get_log_entries(self, skip, take):
with DBSession() as db:
downloaded_sub_query = db.query(ExecuteLog.execute_id, func.count(ExecuteLog.id).label('count'))\
.group_by(ExecuteLog.execute_id, ExecuteLog.level)\
.having(ExecuteLog.level == 'downloaded')\
.subquery()
failed_sub_query = db.query(ExecuteLog.execute_id, func.count(ExecuteLog.id).label('count'))\
.group_by(ExecuteLog.execute_id, ExecuteLog.level)\
.having(ExecuteLog.level == 'failed')\
.subquery()

result_query = db.query(Execute, downloaded_sub_query.c.count, failed_sub_query.c.count)\
.outerjoin(failed_sub_query, Execute.id == failed_sub_query.c.execute_id)\
.outerjoin(downloaded_sub_query, Execute.id == downloaded_sub_query.c.execute_id)\
.order_by(Execute.finish_time.desc())\
.offset(skip)\
.limit(take)

result = []
for execute, downloads, fails in result_query.all():
execute_result = row2dict(execute)
execute_result['downloaded'] = downloads or 0
execute_result['failed'] = fails or 0
result.append(execute_result)

execute_count = db.query(func.count(Execute.id)).scalar()

return result, execute_count

def get_execute_log_details(self, execute_id):
with DBSession() as db:
log_entries = db.query(ExecuteLog).filter(ExecuteLog.execute_id == execute_id).all()
return [row2dict(e) for e in log_entries]


class EngineRunner(threading.Thread):
Expand Down Expand Up @@ -139,7 +269,7 @@ def _execute(self):
caught_exception = e
finally:
self.is_executing = False
self.last_execute = datetime.now()
self.last_execute = datetime.now(pytz.utc)
self.logger.finished(self.last_execute, caught_exception)
return True

Expand Down Expand Up @@ -178,18 +308,18 @@ def last_execute(self, value):

def _update_execute_settings(self):
with DBSession() as db:
settings_execute = db.query(Execute).first()
settings_execute = db.query(ExecuteSettings).first()
if not settings_execute:
settings_execute = Execute()
settings_execute = ExecuteSettings()
db.add(settings_execute)
settings_execute.interval = self._interval
settings_execute.last_execute = self._last_execute

def _get_execute_settings(self):
with DBSession() as db:
settings_execute = db.query(Execute).first()
settings_execute = db.query(ExecuteSettings).first()
if not settings_execute:
settings_execute = Execute(interval=self.DEFAULT_INTERVAL, last_execute=None)
settings_execute = ExecuteSettings(interval=self.DEFAULT_INTERVAL, last_execute=None)
else:
db.expunge(settings_execute)
return settings_execute
64 changes: 46 additions & 18 deletions monitorrent/plugin_managers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
from monitorrent.db import DBSession, row2dict
from monitorrent.plugins import Topic
from monitorrent.plugins.trackers import TrackerPluginBase, TrackerPluginWithCredentialsBase
from monitorrent.plugins.trackers import TrackerPluginBase, WithCredentialsMixin
from monitorrent.settings_manager import SettingsManager

plugins = dict()
upgrades = list()
Expand Down Expand Up @@ -45,20 +46,20 @@ def __init__(self, trackers=None):

def get_settings(self, name):
tracker = self.get_tracker(name)
if not isinstance(tracker, TrackerPluginWithCredentialsBase):
if not isinstance(tracker, WithCredentialsMixin):
return None
return tracker.get_credentials()

def set_settings(self, name, settings):
tracker = self.get_tracker(name)
if not isinstance(tracker, TrackerPluginWithCredentialsBase):
if not isinstance(tracker, WithCredentialsMixin):
return False
tracker.update_credentials(settings)
return True

def check_connection(self, name):
tracker = self.get_tracker(name)
if not tracker or not isinstance(tracker, TrackerPluginWithCredentialsBase):
if not tracker or not isinstance(tracker, WithCredentialsMixin):
return False
return tracker.verify()

Expand Down Expand Up @@ -135,10 +136,21 @@ def execute(self, engine):


class ClientsManager(object):
def __init__(self, clients=None):
def __init__(self, clients=None, default_client_name=None):
if clients is None:
clients = get_plugins('client')
self.clients = clients
self.default_client = self.__get_default_client(default_client_name,
self.clients.values()[0] if len(self.clients) > 0 else None)

def set_default(self, name):
default_client = self.__get_default_client(name)
if default_client is None:
raise KeyError()
self.default_client = default_client

def get_default(self):
return self.default_client

def get_settings(self, name):
client = self.get_client(name)
Expand All @@ -156,20 +168,36 @@ def get_client(self, name):
return self.clients[name]

def find_torrent(self, torrent_hash):
for name, client in self.clients.iteritems():
result = client.find_torrent(torrent_hash)
if result:
return result
return False
if self.default_client is None:
return False
result = self.default_client.find_torrent(torrent_hash)
return result or False

def add_torrent(self, torrent):
for name, client in self.clients.iteritems():
if client.add_torrent(torrent):
return True
return False
if self.default_client is None:
return False
return self.default_client.add_torrent(torrent)

def remove_torrent(self, torrent_hash):
for name, client in self.clients.iteritems():
if client.remove_torrent(torrent_hash):
return True
return False
if self.default_client is None:
return False
return self.default_client.remove_torrent(torrent_hash)

def __get_default_client(self, name=None, default=None):
if name is not None:
return self.clients.get(name, default)
return default


class DbClientsManager(ClientsManager):
def __init__(self, clients, settings_manager):
"""
:type clients: dict
:type settings_manager: SettingsManager
"""
self.settings_manager = settings_manager
super(DbClientsManager, self).__init__(clients, settings_manager.get_default_client())

def set_default(self, name):
self.settings_manager.set_default_client(name)
super(DbClientsManager, self).set_default(name)
6 changes: 3 additions & 3 deletions monitorrent/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from monitorrent.db import Base
from sqlalchemy import Column, Integer, String, DateTime, MetaData, Table
from monitorrent.db import Base, UTCDateTime
from sqlalchemy import Column, Integer, String


class TopicPolymorphicMap(dict):
Expand All @@ -22,7 +22,7 @@ class Topic(Base):
id = Column(Integer, primary_key=True)
display_name = Column(String, unique=True, nullable=False)
url = Column(String, nullable=False, unique=True)
last_update = Column(DateTime, nullable=True)
last_update = Column(UTCDateTime, nullable=True)
type = Column(String)

__mapper_args__ = {
Expand Down
Loading

0 comments on commit ddca577

Please sign in to comment.