Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhooks #2624

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
15 changes: 14 additions & 1 deletion requirements/develop.pip
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=requirements/develop.pip requirements/develop.in
Expand All @@ -21,6 +21,10 @@ appdirs==1.4.4
# urlextract
asttokens==2.4.1
# via stack-data
async-timeout==4.0.3
# via
# -c requirements/install.pip
# redis
attrs==23.2.0
# via
# -c requirements/install.pip
Expand Down Expand Up @@ -170,6 +174,10 @@ email-validator==2.1.1
# -c requirements/test.pip
# flask-security-too
# wtforms
exceptiongroup==1.2.1
# via
# ipython
# pytest
executing==2.0.1
# via stack-data
factory-boy==2.12.0
Expand Down Expand Up @@ -623,6 +631,11 @@ tlds==2024030600
# -r requirements/install.in
toml==0.10.2
# via pre-commit
tomli==2.0.1
# via
# build
# pyproject-hooks
# pytest
tqdm==4.66.2
# via twine
traitlets==5.14.2
Expand Down
4 changes: 3 additions & 1 deletion requirements/install.pip
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=requirements/install.pip requirements/install.in
Expand All @@ -10,6 +10,8 @@ aniso8601==9.0.1
# via flask-restx
appdirs==1.4.4
# via urlextract
async-timeout==4.0.3
# via redis
attrs==23.2.0
# via jsonschema
authlib==0.14.3
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ exclude =
specs,
udata/static,
udata/templates
docstring-quotes = '''

[wheel]
universal = 1
3 changes: 3 additions & 0 deletions udata/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ def register_extensions(app):

def register_features(app):
from udata.features import notifications
from udata.features import webhooks

webhooks.init_app(app)

notifications.init_app(app)

Expand Down
26 changes: 23 additions & 3 deletions udata/core/dataset/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,10 @@ def latest(self):

If this resource is updated and `url` changes, this property won't.
'''
return endpoint_for('datasets.resource', 'api.resource_redirect', id=self.id, _external=True)
return endpoint_for(
'datasets.resource', 'api.resource_redirect',
id=self.id, _external=True
)

@cached_property
def json_ld(self):
Expand Down Expand Up @@ -911,6 +914,9 @@ class CommunityResource(ResourceMixin, WithMetrics, Owned, db.Document):
Local file, remote file or API added by the community of the users to the
original dataset
'''
on_create = signal('CommunityResource.on_create')
on_update = signal('CommunityResource.on_update')

dataset = db.ReferenceField(Dataset, reverse_delete_rule=db.NULLIFY)

__metrics_keys__ = [
Expand All @@ -926,6 +932,19 @@ class CommunityResource(ResourceMixin, WithMetrics, Owned, db.Document):
def from_community(self):
return True

@classmethod
def post_save(cls, _, document, **kwargs):
if 'post_save' in kwargs.get('ignores', []):
return
if kwargs.get('created'):
cls.on_create.send(document)
else:
cls.on_update.send(document)


post_save.connect(CommunityResource.post_save, sender=CommunityResource)


class ResourceSchema(object):
@staticmethod
@cache.memoize(timeout=SCHEMA_CACHE_DURATION)
Expand All @@ -946,9 +965,10 @@ def all():
response = requests.get(endpoint, timeout=5)
# do not cache 404 and forward status code
if response.status_code == 404:
raise SchemasCatalogNotFoundException(f'Schemas catalog does not exist at {endpoint}')
msg = f'Schemas catalog does not exist at {endpoint}'
raise SchemasCatalogNotFoundException(msg)
response.raise_for_status()
except requests.exceptions.RequestException as e:
except requests.exceptions.RequestException:
log.exception(f'Error while getting schema catalog from {endpoint}')
schemas = cache.get(cache_key)
else:
Expand Down
2 changes: 1 addition & 1 deletion udata/core/discussions/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from udata.core.post.models import Post
from udata.tasks import connect, get_logger

from .models import Discussion, Message
from .models import Discussion
from .signals import (
on_new_discussion, on_new_discussion_comment, on_discussion_closed
)
Expand Down
2 changes: 2 additions & 0 deletions udata/features/webhooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def init_app(app):
from udata.features.webhooks import triggers # noqa
48 changes: 48 additions & 0 deletions udata/features/webhooks/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
import requests

from flask import current_app

from udata.tasks import get_logger, task
from udata.features.webhooks.utils import sign

log = get_logger(__name__)

# number of time we should retry
DEFAULT_RETRIES = 5
# exponentional backoff factor (in seconds)
# https://docs.celeryproject.org/en/v4.3.0/userguide/tasks.html#Task.retry_backoff
DEFAULT_BACKOFF = 30
# timeout for a single request
DEFAULT_TIMEOUT = 30


def dispatch(event, payload):
webhooks = current_app.config['WEBHOOKS']
for wh in webhooks:
if event in wh.get('events', []):
_dispatch.delay(event, payload, wh)


@task(
autoretry_for=(requests.exceptions.HTTPError,), retry_backoff=DEFAULT_BACKOFF,
retry_kwargs={'max_retries': DEFAULT_RETRIES}
)
def _dispatch(event, event_payload, wh):
url = wh['url']
log.debug(f'Dispatching {event} to {url}')

event_payload = event_payload if not type(event_payload) is str else json.loads(event_payload)
payload = {
'event': event,
'payload': event_payload,
}

r = requests.post(url, json=payload, headers={
'x-hook-signature': sign(payload, wh.get('secret'))
}, timeout=DEFAULT_TIMEOUT)

if not r.ok:
log.error(f'Failed dispatching webhook {event} to {url}')

r.raise_for_status()
86 changes: 86 additions & 0 deletions udata/features/webhooks/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import json

from udata.core.discussions.signals import (
on_new_discussion, on_new_discussion_comment, on_discussion_closed,
)

from udata.features.webhooks.tasks import dispatch
from udata.models import Dataset, Organization, Reuse, CommunityResource


@Dataset.on_create.connect
def on_dataset_create(dataset):
if not dataset.private:
dispatch('datagouvfr.dataset.created', dataset.to_json())


@Dataset.on_delete.connect
def on_dataset_delete(dataset):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to add the if not dataset.private condition on dataset deletion ?

if not dataset.private:
dispatch('datagouvfr.dataset.deleted', dataset.to_json())


@Dataset.on_update.connect
def on_dataset_update(dataset):
updates, _ = dataset._delta()
if dataset.private:
# Notify if newly private but not otherwise
if 'private' in updates:
# Do we want to send the full dataset (because the private update can add private information?)
dispatch('datagouvfr.dataset.deleted', dataset.to_json())
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the logic on dataset privacy here. If an update is made on a private dataset, we're going to dispatch the updated signal?

if 'private' in updates:
dispatch('datagouvfr.dataset.created', dataset.to_json())
else:
dispatch('datagouvfr.dataset.updated', dataset.to_json())


@on_new_discussion.connect
def on_new_discussion(discussion):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question on discussion events, don't we want to add the if not dataset.private condition on dataset deletion ?

dispatch('datagouvfr.discussion.created', discussion.to_json())


@on_new_discussion_comment.connect
def on_new_discussion_comment(discussion, message=None):
dispatch('datagouvfr.discussion.commented', {
'message_id': message,
'discussion': json.loads(discussion.to_json()),
})


@on_discussion_closed.connect
def on_discussion_closed(discussion, message=None):
dispatch('datagouvfr.discussion.closed', {
'message_id': message,
'discussion': json.loads(discussion.to_json()),
})


@Organization.on_create.connect
def on_organization_created(organization):
dispatch('datagouvfr.organization.created', organization.to_json())


@Organization.on_update.connect
def on_organization_updated(organization):
dispatch('datagouvfr.organization.updated', organization.to_json())


@Reuse.on_create.connect
def on_reuse_created(reuse):
dispatch('datagouvfr.reuse.created', reuse.to_json())


@Reuse.on_update.connect
def on_reuse_updated(reuse):
dispatch('datagouvfr.reuse.updated', reuse.to_json())


@CommunityResource.on_create.connect
def on_community_resource_created(community_resource):
dispatch('datagouvfr.community_resource.created', community_resource.to_json())


@CommunityResource.on_update.connect
def on_community_resource_updated(community_resource):
dispatch('datagouvfr.community_resource.updated', community_resource.to_json())
10 changes: 10 additions & 0 deletions udata/features/webhooks/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import json
import hmac


def sign(msg, secret):
if isinstance(secret, str):
secret = secret.encode('utf-8')
if isinstance(msg, (dict, tuple, list)):
msg = json.dumps(msg).encode('utf-8')
return hmac.new(secret, msg, 'sha256').hexdigest()
13 changes: 12 additions & 1 deletion udata/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,17 @@ class Defaults(object):
'SourcesAPI.post',
'FollowAPI.post']

# Webhooks
##########
# expects a list of {
# 'url': 'https://example.com/webhook',
# 'secret': '{secret shared with webhook recipient}',
# # list of events to which the webhook recipient subscribes
# # available events are in `udata/tests/features/webhooks/test_webhooks@WebhookIntegrationTest` # noqa
# 'events': [],
# }
WEBHOOKS = []

FIXTURE_DATASET_SLUGS = []
PUBLISH_ON_RESOURCE_EVENTS = False
RESOURCES_ANALYSER_URI = 'http://localhost:8000'
Expand All @@ -476,7 +487,7 @@ class Testing(object):
'''Sane values for testing. Should be applied as override'''
TESTING = True
# related to https://github.com/noirbizarre/flask-restplus/commit/93e412789f1ef8d1d2eab837f15535cf79bd144d#diff-68876137696247abc8c123622c73a11f # noqa
# this keeps our legacy tests from failing, we should probably fix the tests instead someday
# FIXME: this keeps our legacy tests from failing, we should probably fix the tests instead
PROPAGATE_EXCEPTIONS = False
SEND_MAIL = False
WTF_CSRF_ENABLED = False
Expand Down
1 change: 1 addition & 0 deletions udata/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def init_app(app):
import udata.core.badges.tasks # noqa
import udata.core.storages.tasks # noqa
import udata.harvest.tasks # noqa
import udata.features.webhooks.tasks # noqa

entrypoints.get_enabled('udata.tasks', app)

Expand Down
Empty file.
Loading