Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create msg preprocessor #269

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.pyc
venv/
195 changes: 195 additions & 0 deletions bin/preprocessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env python

# Copyright (C) 2012 STFC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Script to run a sending SSM."""

from __future__ import print_function

import ssm.agents

Check notice

Code scanning / CodeQL

Unused import

Import of 'ssm' is not used.
from ssm import __version__, LOG_BREAK

Check notice

Code scanning / CodeQL

Unused import

Import of 'LOG_BREAK' is not used.
import os
import logging

Check notice

Code scanning / CodeQL

Unused import

Import of 'logging' is not used.
from optparse import OptionParser

Check notice

Code scanning / CodeQL

Unused import

Import of 'OptionParser' is not used.
from dirq.QueueSimple import QueueSimple
from ssm.message_directory import MessageDirectory
import re

try:
import ConfigParser
except ImportError:
import configparser as ConfigParser


def _get_path_to_outq(cp):
try:
qpath = cp.get('messaging', 'path')
print(qpath)
except:
raise ValueError('Cannot retrieve path to outq.')
return qpath


def _get_path_type(cp, log):
try:
path_type = cp.get('messaging', 'path_type')
except ConfigParser.NoOptionError:
log.info('No path type defined, assuming dirq.')
print('No path type defined')
path_type = 'dirq'
return path_type


def _get_queue(qpath, path_type):

for dirpath, dirnames, files in os.walk(qpath):
dirs_at_path = dirnames
files_at_path = files

Check notice

Code scanning / CodeQL

Unused local variable

Variable files_at_path is not used.
path_examined = qpath

Check notice

Code scanning / CodeQL

Unused local variable

Variable path_examined is not used.
break

if path_type == 'dirq':
if QueueSimple is None:
raise ImportError('Dirq path_type requested but'
'dirq module not found.')

if (len(dirs_at_path) == 0 or

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable

Local variable 'dirs_at_path' may be used before it is initialized.
(len(dirs_at_path) == 1 and dirs_at_path[0] == 'combined_queue')):
raise ValueError("Provided path_type was dirq but no "
"directory found at path. Should "
"path_type be 'directory'?")

outq = QueueSimple(qpath)

elif path_type == 'directory':
if len(dirs_at_path) > 0:

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable

Local variable 'dirs_at_path' may be used before it is initialized.
raise ValueError("Provided path_type was directory but an "
"unexpected directory is present at path, "
"as well as files. Should path_type be 'dirq'?")

outq = MessageDirectory(qpath)

else:
raise ValueError('Unsupported path_type variable.')

return outq


def _header_matches_regex(header):
regex_expr_header = re.compile(r'^APEL(?:-[a-z]+)+-message: v[0-9].[0-9]$')
return regex_expr_header.match(header)


def _first_time_executing_code(previous_header):
if previous_header == None:

Check notice

Code scanning / CodeQL

Testing equality to None

Testing for None should use the 'is' operator.
return True
else:
return False


def _add_to_queue(msg, queue_combined_msgs, originally_a_string):
if originally_a_string:
queue_combined_msgs.add(msg)
else:
msg_in_bytes = str.encode(msg)
queue_combined_msgs.add(msg_in_bytes)
return



def _create_new_queue(new_path, path_type):

if path_type == 'dirq':
newq = QueueSimple(new_path)

elif path_type == 'directory':
if not os.path.exists(new_path):
os.makedirs(new_path)
newq = MessageDirectory(new_path)

else:
raise ValueError('Unsupported path_type variable.')

return newq


def _determine_what_to_iterate(outq, path_type):

if path_type == 'dirq':
structure_to_iterate = outq
elif path_type == 'directory':
structure_to_iterate = outq._get_messages()
else:
raise ValueError('Unsupported path_type variable.')

return structure_to_iterate





def create_queue_combined_msgs(cp, log):

previous_header = None
n_msg_combined = 0
n_max_msg_combined = 500
qpath = _get_path_to_outq(cp)
combined_queue_path = os.path.join(qpath, 'combined_queue')
path_type = _get_path_type(cp, log)
outq = _get_queue(qpath, path_type)
structure_to_iterate = _determine_what_to_iterate(outq, path_type)
queue_combined_msgs = _create_new_queue(combined_queue_path, path_type)

for msgid in structure_to_iterate:
if not outq.lock(msgid):
log.warning('Message was locked. %s will not be read.', msgid)
continue

text = outq.get(msgid)
originally_a_string = True
try:
text = text.decode()
originally_a_string = False
except (UnicodeDecodeError, AttributeError):

Check notice

Code scanning / CodeQL

Empty except

'except' clause does nothing but pass and there is no explanatory comment.
pass

splitted_content = text.split('\n')
header = splitted_content[0]
contents_minus_header = splitted_content[1:]

if _header_matches_regex(header):
if header == previous_header and n_msg_combined < n_max_msg_combined:
combined_msgs = combined_msgs + '\n' + '\n'.join(contents_minus_header)
n_msg_combined += 1
else:
if not _first_time_executing_code(previous_header):
_add_to_queue(combined_msgs, queue_combined_msgs, originally_a_string)

combined_msgs = text
previous_header = header
n_msg_combined = 1

outq.remove(msgid)


_add_to_queue(combined_msgs, queue_combined_msgs, originally_a_string)

try:
outq.purge()
except OSError as e:
log.warning('OSError raised while purging message queue: %s', e)

return combined_queue_path

8 changes: 8 additions & 0 deletions bin/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
except ImportError:
import configparser as ConfigParser

from bin.preprocessor import create_queue_combined_msgs


def main():
"""Set up connection, send all messages and quit."""
Expand Down Expand Up @@ -72,6 +74,12 @@ def main():

brokers, project, token = ssm.agents.get_ssm_args(protocol, cp, log)

# Creating queue of combined messages
combined_queue_path = create_queue_combined_msgs(cp, log)

# Updating path in cp so that it points to the 'combined_msgs' queue
cp.set('messaging', 'path', combined_queue_path)

ssm.agents.run_sender(protocol, brokers, project, token, cp, log)


Expand Down
8 changes: 8 additions & 0 deletions ssm/ssm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ def _handle_msg(self, text):
- verify signature
- Return plain-text message, signer's DN and an error/None.
"""

if isinstance(text, bytes):
text = text.decode()

if text is None or text == '':
warning = 'Empty text passed to _handle_msg.'
log.warning(warning)
Expand Down Expand Up @@ -380,6 +384,10 @@ def _send_msg_ams(self, text, msgid):
encrypted.
"""
log.info('Sending message: %s', msgid)

if isinstance(text, bytes):
text = text.decode()

if text is not None:
# First we sign the message
to_send = crypto.sign(text, self._cert, self._key)
Expand Down