Skip to content

Commit

Permalink
Merge pull request #49 from ucsd-ets/danial/add-sasl-support
Browse files Browse the repository at this point in the history
Add authentication configurations support
  • Loading branch information
danialmalik authored Jun 9, 2020
2 parents 2a213d9 + d238384 commit 7552e84
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 100 deletions.
83 changes: 70 additions & 13 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ These files should be located at ``/edx/app/edxapp/`` directory, see the example

in the following files:

- ``lms/envs/aws.py (production.py for ironwood release)``
- ``lms/envs/production.py (aws.py for hawthorn release)``

- ``cms/envs/aws.py (production.py for ironwood release)``
- ``cms/envs/production.py (aws.py for hawthorn release)``

**Note:**

Expand Down Expand Up @@ -100,9 +100,9 @@ These files should be located at ``/edx/app/edxapp/`` directory, see the example

in the following files:

- ``lms/envs/aws.py (production.py for ironwood release)``
- ``lms/envs/production.py (aws.py for hawthorn release)``

- ``cms/envs/aws.py (production.py for ironwood release)``
- ``cms/envs/production.py (aws.py for hawthorn release)``

Using Kafka Broker API
**********************
Expand All @@ -127,38 +127,95 @@ These files should be located at ``/edx/app/edxapp/`` directory, see the example
}

2. Add the following keys and their values in the ``lms.env.json`` and ``cms.env.json`` files.
Please note that all parameters in the `PRODUCER_CONFIG` are unique to the broker instances. You
can set whatever parameters are required for your instance.

::

"CALIPER_KAFKA_SETTINGS": {
"MAXIMUM_RETRIES": <integer>,
"END_POINT": "kafka endpoint",
"TOPIC_NAME": "topic name",
"ERROR_REPORT_EMAIL": "[email protected]"
}
"PRODUCER_CONFIG": {
"bootstrap_servers": [
"<List of Kafka Brokers URLs>"
],
...
},

"TOPIC_NAME": "<Kafka Topic Name>",

"ERROR_REPORT_EMAIL": "<Reporting Email Address>",
"MAXIMUM_RETRIES": <An Integer>
},

+------------------+------------------------------------------------------------------------------+
|Keys | Description |
+==================+==============================================================================+
|MAXIMUM_RETRIES |Number of times the app will try to send the logs to Kafka in case of failure |
+------------------+------------------------------------------------------------------------------+
|END_POINT |URL for Kafka Broker |
|PRODUCER_CONFIG |Configurations for initializing the Kafka Producer |
| | |
| |Can further contain: |
| | - "bootstrap_servers": |
| | - List of Kafka Brokers URLs |
| | - Any other supported paramter in the `Kafka-python docs`_ |
| | - Please note that it's better to store the sensitive information in |
| | the `*.auth.json` files |
+------------------+------------------------------------------------------------------------------+
|TOPIC_NAME |Topic name for the Kafka broker |
+------------------+------------------------------------------------------------------------------+
|ERROR_REPORT_EMAIL|Email Address to notify when number of failures exceeds the MAXIMUM_RETRIES |
+------------------+------------------------------------------------------------------------------+

3. Add the following lines of code:
3. Add the following keys and their values in the ``lms.auth.json`` and ``cms.auth.json`` files.
Please note that all parameters in the `PRODUCER_CONFIG` are unique to the broker instances. You
can set whatever parameters are required for you.

::

"CALIPER_KAFKA_AUTH_SETTINGS": {
"PRODUCER_CONFIG": {
...
"sasl_plain_username": "<Username>",
"sasl_plain_password": "<Password>",
"security_protocol": "<Secuirty Protocol>",
"ssl_cafile": "<Path/to/the/ca/file>",
...
}
}

+------------------+------------------------------------------------------------------------------+
|Keys | Description |
+==================+==============================================================================+
|PRODUCER_CONFIG |Configurations for initializing the Kafka Producer. Use this confiration to |
| |store all sensitive configuration like authentication parameters. |
| | |
| |For example: |
| | - Use this to configure paramters like: |
| | - sasl_plain_username |
| | - sasl_plain_password |
| | - security_protocol |
| | - sasl_mechanism |
| | |
| |It can further contain: |
| | - Any other supported paramter in the `Kafka-python docs`_ |
| | - Please note that it's better to store the insensitive information |
| | in the `*.env.json` files |
+------------------+------------------------------------------------------------------------------+

.. _Kafka-python docs: https://kafka-python.readthedocs.io/en/2.0.1/apidoc/KafkaProducer.html#kafka.KafkaProducer

4. Add the following lines of code:

::

if FEATURES.get('ENABLE_KAFKA_FOR_CALIPER'):
CALIPER_KAFKA_SETTINGS = ENV_TOKENS.get('CALIPER_KAFKA_SETTINGS')
CALIPER_KAFKA_AUTH_SETTINGS = AUTH_TOKENS.get('CALIPER_KAFKA_AUTH_SETTINGS')

in the following files:

- ``lms/envs/aws.py (production.py for ironwood release)``
- ``lms/envs/production.py (aws.py for hawthorn release)``

- ``cms/envs/aws.py (production.py for ironwood release)``
- ``cms/envs/production.py (aws.py for hawthorn release)``

Location of Transformed Logs
############################
Expand Down
5 changes: 5 additions & 0 deletions openedx_caliper_tracking/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Exceptions for the app"""


class InvalidConfigurationsError(Exception):
pass
19 changes: 19 additions & 0 deletions openedx_caliper_tracking/kafka_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging

from django.conf import settings

LOGGER = logging.getLogger(__name__)


def get_kafka_producer_configurations():
"""
Return the configurations required to initialize the KafkaProducer object.
"""
try:
configurations = {}
configurations.update(settings.CALIPER_KAFKA_SETTINGS.get('PRODUCER_CONFIG', {}))
configurations.update(settings.CALIPER_KAFKA_AUTH_SETTINGS.get('PRODUCER_CONFIG', {}))
return configurations

except KeyError as ex:
LOGGER.exception('Invalid or no configurations are provided for KafkaProducer: %s', str(ex))
111 changes: 85 additions & 26 deletions openedx_caliper_tracking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,72 @@
from kafka.errors import KafkaError

from openedx_caliper_tracking.utils import send_notification
from openedx_caliper_tracking.exceptions import InvalidConfigurationsError
from openedx_caliper_tracking.kafka_utils import get_kafka_producer_configurations
from openedx_caliper_tracking.loggers import get_caliper_logger

LOGGER = logging.getLogger(__name__)
CALIPER_DELIVERY_FAILURE_LOGGER = get_caliper_logger('caliper_delivery_failure', 'local3')
DEFAULT_FROM_EMAIL = settings.DEFAULT_FROM_EMAIL
CALIPER_DELIVERY_FAILURE_LOGGER = get_caliper_logger(
'caliper_delivery_failure', 'local3'
)

EMAIL_DELIVERY_CACHE_KEY = 'IS_KAFKA_DELIVERY_FAILURE_EMAIL_SENT'
HOST_ERROR_CACHE_KEY = 'HOST_NOT_FOUND_ERROR'

DEFAULT_FROM_EMAIL = settings.DEFAULT_FROM_EMAIL
REPORT_EMAIL_VALIDITY_PERIOD = 86400 # in ms. Equals to one day.

def _get_kafka_setting(key):
if hasattr(settings, 'CALIPER_KAFKA_SETTINGS'):
return settings.CALIPER_KAFKA_SETTINGS.get(key)
MAXIMUM_RETRIES = getattr(settings, 'CALIPER_KAFKA_SETTINGS', {}).get('MAXIMUM_RETRIES', 3)


@task(bind=True, max_retries=_get_kafka_setting('MAXIMUM_RETRIES'))
@task(bind=True, max_retries=MAXIMUM_RETRIES)
def deliver_caliper_event_to_kafka(self, transformed_event, event_type):
"""
Deliver caliper event to kafka.
Retries for the given number of max_tries in case of any error else
sends an error report to the specified email address.
"""
KAFKA_SETTINGS = settings.CALIPER_KAFKA_SETTINGS

bootstrap_servers = KAFKA_SETTINGS['PRODUCER_CONFIG']['bootstrap_servers']
topic_name = KAFKA_SETTINGS['TOPIC_NAME']

try:
LOGGER.info('Attempt # {} of sending event: {} to kafka ({}) is in progress.'.format(
self.request_stack().get('retries'), event_type, _get_kafka_setting('END_POINT')))
self.request_stack().get('retries'), event_type, bootstrap_servers))

producer_configrations = get_kafka_producer_configurations()

try:
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
**producer_configrations
)

# Invalid/unsupported arguments are provided
except TypeError as ex:
LOGGER.exception(
'Invalid configurations are provided for KafkaProducer: %s', str(ex))
raise InvalidConfigurationsError('Invalid Configurations are provided')

# Most probably a certificate file was not found.
except IOError as ex:
LOGGER.exception(
'Configured Certificate is not found: %s', str(ex)
)
raise InvalidConfigurationsError('Invalid Configurations are provided')

except Exception as ex:
LOGGER.exception(
'Error occurred while trying to configure Kafka: %s', str(ex)
)
raise InvalidConfigurationsError('Invalid Configurations are provided')

producer.send(topic_name, transformed_event).add_errback(host_not_found,
event=transformed_event,
event_type=event_type)

producer = KafkaProducer(bootstrap_servers=_get_kafka_setting('END_POINT'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send(_get_kafka_setting('TOPIC_NAME'), transformed_event).add_errback(host_not_found,
event=transformed_event,
event_type=event_type)
producer.flush()

if cache.get(HOST_ERROR_CACHE_KEY):
Expand All @@ -51,21 +86,33 @@ def deliver_caliper_event_to_kafka(self, transformed_event, event_type):

if cache.get(EMAIL_DELIVERY_CACHE_KEY):
send_system_recovery_email.delay()
cache.set(EMAIL_DELIVERY_CACHE_KEY, False)
cache.set(EMAIL_DELIVERY_CACHE_KEY, False)

LOGGER.info('Logs Delivered Successfully: Event ({}) has been successfully sent to kafka ({}).'.format(
event_type, _get_kafka_setting('END_POINT')))
event_type, bootstrap_servers))

except KafkaError as error:
LOGGER.error(('Logs Delivery Failed: Could not deliver event ({}) to kafka ({}) because'
' of {}.').format(event_type, _get_kafka_setting('END_POINT'), error.__class__.__name__))
' of {}.').format(event_type, bootstrap_servers, error.__class__.__name__))

if self.request_stack().get('retries') == _get_kafka_setting('MAXIMUM_RETRIES'):
if self.request_stack().get('retries') == KAFKA_SETTINGS['MAXIMUM_RETRIES']:
CALIPER_DELIVERY_FAILURE_LOGGER.info(json.dumps(transformed_event))
sent_kafka_failure_email.delay(error.__class__.__name__)
return

self.retry(exc=error, countdown=int(random.uniform(2, 4) ** self.request.retries))
self.retry(exc=error, countdown=int(
random.uniform(2, 4) ** self.request.retries))

except InvalidConfigurationsError as ex:
# No need to retry the task if there is some configurations issue.
LOGGER.error(('Logs Delivery Failed: Could not deliver event ({}) to kafka ({}) due'
' to the error: {}').format(
event_type,
bootstrap_servers,
str(ex)
))

sent_kafka_failure_email.delay(ex.__class__.__name__)


def host_not_found(error, event, event_type):
Expand All @@ -76,7 +123,10 @@ def host_not_found(error, event, event_type):
"""
HOST_NOT_FOUND_ERROR = 'Host Not Found'
LOGGER.error('Logs Delivery Failed: Could not deliver event ({}) to kafka ({}) because of {}.'.format(
event_type, _get_kafka_setting('END_POINT'), HOST_NOT_FOUND_ERROR))
event_type,
settings.CALIPER_KAFKA_SETTINGS['PRODUCER_CONFIG']['bootstrap_servers'],
HOST_NOT_FOUND_ERROR
))
cache.set(HOST_ERROR_CACHE_KEY, True)
sent_kafka_failure_email.delay(HOST_NOT_FOUND_ERROR)

Expand All @@ -86,9 +136,13 @@ def sent_kafka_failure_email(self, error):
"""
Send error report to specified email address.
"""
reporting_email = settings.CALIPER_KAFKA_SETTINGS.get('ERROR_REPORT_EMAIL')
if not reporting_email:
return

if cache.get(EMAIL_DELIVERY_CACHE_KEY):
LOGGER.info('Email Already Sent: Events delivery failure report has been already sent to {}.'.format(
_get_kafka_setting('ERROR_REPORT_EMAIL')))
reporting_email))
return

data = {
Expand All @@ -97,16 +151,17 @@ def sent_kafka_failure_email(self, error):
'error': error
}
subject = 'Failure in logs delivery to Kafka'
if send_notification(data, subject, DEFAULT_FROM_EMAIL, [_get_kafka_setting('ERROR_REPORT_EMAIL')]):
if send_notification(data, subject, DEFAULT_FROM_EMAIL, [reporting_email]):
success_message = 'Email Sent Successfully: Events delivery failure report sent to {}.'.format(
_get_kafka_setting('ERROR_REPORT_EMAIL'))
reporting_email)
# after one day if the delivery of events to kafka still fails,
# email failure delivery report again.
cache.set(EMAIL_DELIVERY_CACHE_KEY, True, timeout=86400)
cache.set(EMAIL_DELIVERY_CACHE_KEY, True,
timeout=REPORT_EMAIL_VALIDITY_PERIOD)
LOGGER.info(success_message)
else:
failure_message = 'Email Sending Failed: Could not send events delivery failure report to {}.'.format(
_get_kafka_setting('ERROR_REPORT_EMAIL'))
reporting_email)
LOGGER.error(failure_message)


Expand All @@ -115,16 +170,20 @@ def send_system_recovery_email(self):
"""
Send system recovery report to specified email address.
"""
reporting_email = settings.CALIPER_KAFKA_SETTINGS.get('ERROR_REPORT_EMAIL')
if not reporting_email:
return

data = {
'name': 'UCSD Support',
'body': 'System has been recovered. Now Caliper logs are being successfully delivered to kafka.',
}
subject = 'Success in logs delivery to Kafka'
if send_notification(data, subject, DEFAULT_FROM_EMAIL, [_get_kafka_setting('ERROR_REPORT_EMAIL')]):
if send_notification(data, subject, DEFAULT_FROM_EMAIL, [reporting_email]):
success_message = 'Email Sent Successfully: Events delivery success report sent to {}.'.format(
_get_kafka_setting('ERROR_REPORT_EMAIL'))
reporting_email)
LOGGER.info(success_message)
else:
failure_message = 'Email Sending Failed: Could not send events delivery success report to {}.'.format(
_get_kafka_setting('ERROR_REPORT_EMAIL'))
reporting_email)
LOGGER.error(failure_message)
Loading

0 comments on commit 7552e84

Please sign in to comment.