diff --git a/netflowcollector.py b/netflowcollector.py index d8a1c11..2df99e7 100644 --- a/netflowcollector.py +++ b/netflowcollector.py @@ -1,4 +1,5 @@ import argparse +import base64 import gzip import json import logging @@ -12,16 +13,6 @@ from colors import color -from lookup import DIRECTION_INGRESS - - -# python-netflow-v9-softflowd expects main.py to be the main entrypoint, but we only need -# get_export_packets() iterator: -sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow') -from pynetflow.main import get_export_packets -# disable DEBUG logging on NetFlow collector library: -logging.getLogger('pynetflow.main').setLevel(logging.INFO) - IS_DEBUG = os.environ.get('DEBUG', 'false') in ['true', 'yes', '1'] logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG if IS_DEBUG else logging.INFO) @@ -32,99 +23,28 @@ log = logging.getLogger("{}.{}".format(__name__, "collector")) -def process_netflow(netflow_port, named_pipe_filename): - # endless loop - read netflow packets, encode them to JSON and write them to named pipe: - line = None - last_record_seqs = {} +def pass_netflow_data(netflow_port, named_pipe_filename): + # endless loop - read netflow packets from UDP port and write them to named pipe: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_address = ('localhost', netflow_port,) + log.debug('starting up on {} port {}'.format(*server_address)) + sock.bind(server_address) + + MAX_BUF_SIZE = 4096 + BUFFERING_LINES = 1 # https://docs.python.org/2/library/functions.html#open while True: try: - with open(named_pipe_filename, "wb", 0) as fp: - # if named pipe threq an error for some reason (BrokenPipe), write the line we - # have in buffer before listening to new packets: - if line is not None: - fp.write(line) - line = None - for ts, client, export in get_export_packets('0.0.0.0', NETFLOW_PORT): - - client_ip, _ = client + with open(named_pipe_filename, "wb", BUFFERING_LINES) as fp: + data, address = sock.recvfrom(MAX_BUF_SIZE) + now = time.time() + line = json.dumps((base64.b64encode(data).decode(), now, address)).encode() + b'\n' + fp.write(line) + log.debug(f"Passing [{len(data)}] from client [{address[0]}], ts [{now}]") - # check for missing records: - last_record_seq = last_record_seqs.get(client_ip) - if last_record_seq is None: - log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}") - elif export.header.sequence != last_record_seq + 1: - log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped") - last_record_seqs[client_ip] = export.header.sequence - - flows_data = [flow.data for flow in export.flows] - - if export.header.version == 9: - entry = { - "ts": ts, - "client": client_ip, - "seq": export.header.sequence, - "flows": [[ - # "IN_BYTES": - data["IN_BYTES"], - # "PROTOCOL": - data["PROTOCOL"], - # "DIRECTION": - data["DIRECTION"], - # "L4_DST_PORT": - data["L4_DST_PORT"], - # "L4_SRC_PORT": - data["L4_SRC_PORT"], - # "INPUT_SNMP": - data["INPUT_SNMP"], - # "OUTPUT_SNMP": - data["OUTPUT_SNMP"], - # "IPV4_DST_ADDR": - data["IPV4_DST_ADDR"], - # "IPV4_SRC_ADDR": - data["IPV4_SRC_ADDR"], - ] for data in flows_data], - } - elif export.header.version == 5: - entry = { - "ts": ts, - "client": client_ip, - "seq": export.header.sequence, - "flows": [[ - # "IN_BYTES": - data["IN_OCTETS"], - # "PROTOCOL": - data["PROTO"], - # "DIRECTION": - DIRECTION_INGRESS, - # "L4_DST_PORT": - data["DST_PORT"], - # "L4_SRC_PORT": - data["SRC_PORT"], - # "INPUT_SNMP": - data["INPUT"], - # "OUTPUT_SNMP": - data["OUTPUT"], - # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack - # them back to bytes and transform them to strings: - # "IPV4_DST_ADDR": - socket.inet_ntoa(struct.pack('!I', data["IPV4_DST_ADDR"])), - # "IPV4_SRC_ADDR": - socket.inet_ntoa(struct.pack('!I', data["IPV4_SRC_ADDR"])), - ] for data in flows_data], - } - else: - log.error(f"Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])") - continue - - line = json.dumps(entry).encode() + b'\n' - fp.write(line) - log.debug(f"Wrote seq [{export.header.sequence}] from client [{client_ip}], ts [{ts}], n flows: [{len(flows_data)}]") - line = None except Exception as ex: log.exception(f"Exception: {str(ex)}") - if __name__ == "__main__": NAMED_PIPE_FILENAME = os.environ.get('NAMED_PIPE_FILENAME', None) @@ -140,7 +60,7 @@ def process_netflow(netflow_port, named_pipe_filename): log.info(f"Listening for NetFlow traffic on UDP port {NETFLOW_PORT}") try: - process_netflow(NETFLOW_PORT, NAMED_PIPE_FILENAME) + pass_netflow_data(NETFLOW_PORT, NAMED_PIPE_FILENAME) except KeyboardInterrupt: log.info("KeyboardInterrupt -> exit") pass diff --git a/netflowwriter.py b/netflowwriter.py index 28698ea..cfc5123 100644 --- a/netflowwriter.py +++ b/netflowwriter.py @@ -1,10 +1,13 @@ import argparse +import base64 import gzip import json import logging import os import errno import sys +import socket +import struct import time from collections import defaultdict from datetime import datetime @@ -14,6 +17,13 @@ from lookup import PROTOCOLS from dbutils import migrate_if_needed, get_db_cursor, DB_PREFIX +from lookup import DIRECTION_INGRESS + + +# python-netflow-v9-softflowd expects main.py to be the main entrypoint, but we only need +# parse_packet(): +sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow') +from pynetflow.netflow import parse_packet, UnknownNetFlowVersion, TemplateNotRecognized IS_DEBUG = os.environ.get('DEBUG', 'false') in ['true', 'yes', '1'] @@ -26,6 +36,9 @@ log = logging.getLogger("{}.{}".format(__name__, "writer")) +# Amount of time to wait before dropping an undecodable ExportPacket +PACKET_TIMEOUT = 60 * 60 + def process_named_pipe(named_pipe_filename): try: os.mkfifo(named_pipe_filename) @@ -33,6 +46,7 @@ def process_named_pipe(named_pipe_filename): if ex.errno != errno.EEXIST: raise + templates = {} while True: with open(named_pipe_filename, "rb") as fp: log.info(f"Opened named pipe {named_pipe_filename}") @@ -42,12 +56,28 @@ def process_named_pipe(named_pipe_filename): break try: - write_record(json.loads(line)) + data_b64, ts, client = json.loads(line) + data = base64.b64decode(data_b64) + + try: + export = parse_packet(data, templates) + write_record(ts, client, export) + except UnknownNetFlowVersion: + log.warning("Unknown NetFlow version") + continue + except TemplateNotRecognized: + log.warning("Failed to decode a v9 ExportPacket, template not " + "recognized (if this happens at the start, it's ok)") + continue + except Exception as ex: log.exception("Error writing line, skipping...") -def write_record(j): +last_record_seqs = {} + + +def write_record(ts, client, export): # { # "DST_AS": 0, # "SRC_AS": 0, @@ -73,30 +103,77 @@ def write_record(j): # } # https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622 + client_ip, _ = client + + # check for missing records: + last_record_seq = last_record_seqs.get(client_ip) + if last_record_seq is None: + log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}") + elif export.header.sequence != last_record_seq + 1: + log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped") + last_record_seqs[client_ip] = export.header.sequence + with get_db_cursor() as c: # first save the flow record: - ts = j['ts'] - log.debug(f"Received record [{j['seq']}]: {datetime.utcfromtimestamp(ts)} from {j['client']}") - c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, j['client'],)) + log.debug(f"Received record [{export.header.sequence}]: {datetime.utcfromtimestamp(ts)} from {client_ip}") + c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, client_ip,)) record_db_seq = c.fetchone()[0] # then save each of the flows within the record, but use execute_values() to perform bulk insert: - def _get_data(record_db_seq, flows): - for flow in flows: - yield ( - record_db_seq, - *flow, - # flow.get('IN_BYTES'), - # flow.get('PROTOCOL'), - # flow.get('DIRECTION'), - # flow.get('L4_DST_PORT'), - # flow.get('L4_SRC_PORT'), - # flow.get('INPUT_SNMP'), - # flow.get('OUTPUT_SNMP'), - # flow.get('IPV4_DST_ADDR'), - # flow.get('IPV4_SRC_ADDR'), - ) - data_iterator = _get_data(record_db_seq, j['flows']) + def _get_data(netflow_version, record_db_seq, flows): + if netflow_version == 9: + for f in flows: + yield ( + record_db_seq, + # "IN_BYTES": + f.data["IN_BYTES"], + # "PROTOCOL": + f.data["PROTOCOL"], + # "DIRECTION": + f.data["DIRECTION"], + # "L4_DST_PORT": + f.data["L4_DST_PORT"], + # "L4_SRC_PORT": + f.data["L4_SRC_PORT"], + # "INPUT_SNMP": + f.data["INPUT_SNMP"], + # "OUTPUT_SNMP": + f.data["OUTPUT_SNMP"], + # "IPV4_DST_ADDR": + f.data["IPV4_DST_ADDR"], + # "IPV4_SRC_ADDR": + f.data["IPV4_SRC_ADDR"], + ) + elif netflow_version == 5: + for f in flows: + yield ( + record_db_seq, + # "IN_BYTES": + f.data["IN_OCTETS"], + # "PROTOCOL": + f.data["PROTO"], + # "DIRECTION": + DIRECTION_INGRESS, + # "L4_DST_PORT": + f.data["DST_PORT"], + # "L4_SRC_PORT": + f.data["SRC_PORT"], + # "INPUT_SNMP": + f.data["INPUT"], + # "OUTPUT_SNMP": + f.data["OUTPUT"], + # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack + # them back to bytes and transform them to strings: + # "IPV4_DST_ADDR": + socket.inet_ntoa(struct.pack('!I', f.data["IPV4_DST_ADDR"])), + # "IPV4_SRC_ADDR": + socket.inet_ntoa(struct.pack('!I', f.data["IPV4_SRC_ADDR"])), + ) + else: + log.error(f"Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])") + return + + data_iterator = _get_data(export.header.version, record_db_seq, export.flows) psycopg2.extras.execute_values( c, f"INSERT INTO {DB_PREFIX}flows (record, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",