Skip to content

Commit

Permalink
Add Flask context automatically to registered executors
Browse files Browse the repository at this point in the history
  • Loading branch information
mxdev88 committed Aug 22, 2021
1 parent af6f32f commit 5119587
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 30 deletions.
10 changes: 2 additions & 8 deletions docs/rst/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ Jobs can also be added after you app is running
Flask Context
-------------

If you wish to use anything from your Flask app context inside the job you can use something like this

.. code-block:: python
def blah():
with scheduler.app.app_context():
# do stuff
The Flask context is pushed automatically when initialising APScheduler executors, i.e. all jobs will be run inside a Flask application context.

If you are making use of Flask-SQLAlchemy and performing DB operations within a job, make sure that you make a call to `db.session.commit()`, in addition to providing the Flask app context.
23 changes: 4 additions & 19 deletions examples/flask_context.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
"""Example using flask context."""

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, current_app

from flask_apscheduler import APScheduler

db = SQLAlchemy()


class User(db.Model):
"""User model."""

id = db.Column(db.Integer, primary_key=True) # noqa: A003, VNE003
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)


def show_users():
def show_app_name():
"""Print all users."""
with db.app.app_context():
print(User.query.all())
print(f"Running example={current_app.name}")


class Config:
"""App configuration."""

JOBS = [{"id": "job1", "func": show_users, "trigger": "interval", "seconds": 2}]
JOBS = [{"id": "job1", "func": show_app_name, "trigger": "interval", "seconds": 2}]

SCHEDULER_JOBSTORES = {
"default": SQLAlchemyJobStore(url="sqlite:///flask_context.db")
Expand All @@ -39,9 +27,6 @@ class Config:
app = Flask(__name__)
app.config.from_object(Config())

db.app = app
db.init_app(app)

scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
Expand Down
30 changes: 28 additions & 2 deletions flask_apscheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from apscheduler.jobstores.base import JobLookupError
from flask import make_response
from . import api
from .utils import fix_job_def, pop_trigger
from .utils import fix_job_def, pop_trigger, with_app_context

LOGGER = logging.getLogger('flask_apscheduler')

Expand Down Expand Up @@ -81,6 +81,7 @@ def init_app(self, app):
self.app.apscheduler = self

self._load_config()
self._apply_app_context()
self._load_jobs()

if self.api_enabled:
Expand Down Expand Up @@ -281,7 +282,11 @@ def run_job(self, id, jobstore=None):
if not job:
raise JobLookupError(id)

job.func(*job.args, **job.kwargs)
if self.app:
with self.app.app_context():
job.func(*job.args, **job.kwargs)
else:
job.func(*job.args, **job.kwargs)

def authenticate(self, func):
"""
Expand All @@ -305,6 +310,10 @@ def _load_config(self):
if executors:
options['executors'] = executors

if 'executors' not in options:
# APScheduler adds the default 'executor' at execution time rather than at configuration time, we need it at conf time in order to apply Flask app context.
options['executors'] = {'default': {'type': 'threadpool'}}

job_defaults = self.app.config.get('SCHEDULER_JOB_DEFAULTS')
if job_defaults:
options['job_defaults'] = job_defaults
Expand Down Expand Up @@ -370,6 +379,23 @@ def _add_url_route(self, endpoint, rule, view_func, method):
methods=[method]
)

def _apply_app_context(self):
"""
Apply Flask application context to the scheduler executors.
"""
import importlib, six
with self._scheduler._executors_lock:
for alias, executor in six.iteritems(self._scheduler._executors):
try:
module_name = executor.__module__
module = importlib.import_module(module_name)
except ModuleNotFoundError:
LOGGER.error(f'Unable to add Flask app context to {module_name}.')
continue

if 'run_job' in vars(module):
vars(module)['run_job'] = with_app_context(self.app, vars(module)['run_job'])

def _apply_auth(self, view_func):
"""
Apply decorator to authenticate the user who is making the request.
Expand Down
12 changes: 12 additions & 0 deletions flask_apscheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from collections import OrderedDict
from functools import wraps


def job_to_dict(job):
Expand Down Expand Up @@ -162,3 +163,14 @@ def wsgi_to_bytes(data):
if isinstance(data, bytes):
return data
return data.encode("latin1") # XXX: utf8 fallback?


def with_app_context(app, func):
@wraps(func)
def wrapper(*args, **kwargs):
if not app:
return func(*args, **kwargs)
with app.app_context():
return func(*args, **kwargs)

return wrapper
26 changes: 25 additions & 1 deletion tests/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from flask import Flask
from flask import Flask, current_app
from flask_apscheduler import APScheduler, utils
from unittest import TestCase
import apscheduler
from pytz import utc
import datetime
import sys
import importlib


class TestScheduler(TestCase):
def setUp(self):
Expand Down Expand Up @@ -184,6 +187,27 @@ def decorated_job():
self.scheduler.shutdown()
self.assertFalse(self.scheduler.running)

def test_run_job(self):
job = self.scheduler.add_job('job2', job2)

with self.assertRaises(RuntimeError):
self.scheduler.run_job('job2')

job = self.scheduler_two.add_job('job2', job2)
self.scheduler_two.run_job('job2')

def test_apply_app_context(self):
now = datetime.datetime.now(utc)
self.scheduler_two.start()
job = self.scheduler_two.add_job('appctx', job2, trigger='date', next_run_time=now)
executor = self.scheduler_two._scheduler._executors['default']

self.assertTrue(executor.submit_job(job, [now]) is None)


def job1():
pass


def job2():
return current_app.name
12 changes: 12 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from copy import deepcopy
from flask_apscheduler import utils
from flask import Flask
from unittest import TestCase
from flask_apscheduler.utils import with_app_context

class TestUtils(TestCase):
def setUp(self):
self.app = Flask(__name__)

def test_pop_trigger(self):
def __pop_trigger(trigger, *params):
data = dict(trigger=trigger)
Expand All @@ -23,3 +28,10 @@ def __pop_trigger(trigger, *params):
__pop_trigger('interval', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'start_date', 'end_date', 'timezone')
__pop_trigger('cron', 'year', 'month', 'day', 'week', 'day_of_week', 'hour', 'minute', 'second', 'start_date', 'end_date', 'timezone')
self.assertRaises(Exception, utils.pop_trigger, dict(trigger='invalid_trigger'))

def test_with_app_context(self):
def one_func():
return 'x'

one_func = with_app_context(self.app, one_func)
self.assertTrue('x' == one_func())

0 comments on commit 5119587

Please sign in to comment.