Skip to content

Commit

Permalink
Send less data over named pipe (parse packets later)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafolean committed Apr 7, 2020
1 parent 58f5122 commit 1487976
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 118 deletions.
114 changes: 17 additions & 97 deletions netflowcollector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import base64
import gzip
import json
import logging
Expand All @@ -12,16 +13,6 @@
from colors import color


from lookup import DIRECTION_INGRESS


# python-netflow-v9-softflowd expects main.py to be the main entrypoint, but we only need
# get_export_packets() iterator:
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow')
from pynetflow.main import get_export_packets
# disable DEBUG logging on NetFlow collector library:
logging.getLogger('pynetflow.main').setLevel(logging.INFO)

IS_DEBUG = os.environ.get('DEBUG', 'false') in ['true', 'yes', '1']
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG if IS_DEBUG else logging.INFO)
Expand All @@ -32,99 +23,28 @@
log = logging.getLogger("{}.{}".format(__name__, "collector"))


def process_netflow(netflow_port, named_pipe_filename):
# endless loop - read netflow packets, encode them to JSON and write them to named pipe:
line = None
last_record_seqs = {}
def pass_netflow_data(netflow_port, named_pipe_filename):
# endless loop - read netflow packets from UDP port and write them to named pipe:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('localhost', netflow_port,)
log.debug('starting up on {} port {}'.format(*server_address))
sock.bind(server_address)

MAX_BUF_SIZE = 4096
BUFFERING_LINES = 1 # https://docs.python.org/2/library/functions.html#open
while True:
try:
with open(named_pipe_filename, "wb", 0) as fp:
# if named pipe threq an error for some reason (BrokenPipe), write the line we
# have in buffer before listening to new packets:
if line is not None:
fp.write(line)
line = None
for ts, client, export in get_export_packets('0.0.0.0', NETFLOW_PORT):

client_ip, _ = client
with open(named_pipe_filename, "wb", BUFFERING_LINES) as fp:
data, address = sock.recvfrom(MAX_BUF_SIZE)
now = time.time()
line = json.dumps((base64.b64encode(data).decode(), now, address)).encode() + b'\n'
fp.write(line)
log.debug(f"Passing [{len(data)}] from client [{address[0]}], ts [{now}]")

# check for missing records:
last_record_seq = last_record_seqs.get(client_ip)
if last_record_seq is None:
log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}")
elif export.header.sequence != last_record_seq + 1:
log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped")
last_record_seqs[client_ip] = export.header.sequence

flows_data = [flow.data for flow in export.flows]

if export.header.version == 9:
entry = {
"ts": ts,
"client": client_ip,
"seq": export.header.sequence,
"flows": [[
# "IN_BYTES":
data["IN_BYTES"],
# "PROTOCOL":
data["PROTOCOL"],
# "DIRECTION":
data["DIRECTION"],
# "L4_DST_PORT":
data["L4_DST_PORT"],
# "L4_SRC_PORT":
data["L4_SRC_PORT"],
# "INPUT_SNMP":
data["INPUT_SNMP"],
# "OUTPUT_SNMP":
data["OUTPUT_SNMP"],
# "IPV4_DST_ADDR":
data["IPV4_DST_ADDR"],
# "IPV4_SRC_ADDR":
data["IPV4_SRC_ADDR"],
] for data in flows_data],
}
elif export.header.version == 5:
entry = {
"ts": ts,
"client": client_ip,
"seq": export.header.sequence,
"flows": [[
# "IN_BYTES":
data["IN_OCTETS"],
# "PROTOCOL":
data["PROTO"],
# "DIRECTION":
DIRECTION_INGRESS,
# "L4_DST_PORT":
data["DST_PORT"],
# "L4_SRC_PORT":
data["SRC_PORT"],
# "INPUT_SNMP":
data["INPUT"],
# "OUTPUT_SNMP":
data["OUTPUT"],
# netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
# them back to bytes and transform them to strings:
# "IPV4_DST_ADDR":
socket.inet_ntoa(struct.pack('!I', data["IPV4_DST_ADDR"])),
# "IPV4_SRC_ADDR":
socket.inet_ntoa(struct.pack('!I', data["IPV4_SRC_ADDR"])),
] for data in flows_data],
}
else:
log.error(f"Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
continue

line = json.dumps(entry).encode() + b'\n'
fp.write(line)
log.debug(f"Wrote seq [{export.header.sequence}] from client [{client_ip}], ts [{ts}], n flows: [{len(flows_data)}]")
line = None
except Exception as ex:
log.exception(f"Exception: {str(ex)}")



if __name__ == "__main__":

NAMED_PIPE_FILENAME = os.environ.get('NAMED_PIPE_FILENAME', None)
Expand All @@ -140,7 +60,7 @@ def process_netflow(netflow_port, named_pipe_filename):
log.info(f"Listening for NetFlow traffic on UDP port {NETFLOW_PORT}")

try:
process_netflow(NETFLOW_PORT, NAMED_PIPE_FILENAME)
pass_netflow_data(NETFLOW_PORT, NAMED_PIPE_FILENAME)
except KeyboardInterrupt:
log.info("KeyboardInterrupt -> exit")
pass
119 changes: 98 additions & 21 deletions netflowwriter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import argparse
import base64
import gzip
import json
import logging
import os
import errno
import sys
import socket
import struct
import time
from collections import defaultdict
from datetime import datetime
Expand All @@ -14,6 +17,13 @@

from lookup import PROTOCOLS
from dbutils import migrate_if_needed, get_db_cursor, DB_PREFIX
from lookup import DIRECTION_INGRESS


# python-netflow-v9-softflowd expects main.py to be the main entrypoint, but we only need
# parse_packet():
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow')
from pynetflow.netflow import parse_packet, UnknownNetFlowVersion, TemplateNotRecognized


IS_DEBUG = os.environ.get('DEBUG', 'false') in ['true', 'yes', '1']
Expand All @@ -26,13 +36,17 @@
log = logging.getLogger("{}.{}".format(__name__, "writer"))


# Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60

def process_named_pipe(named_pipe_filename):
try:
os.mkfifo(named_pipe_filename)
except OSError as ex:
if ex.errno != errno.EEXIST:
raise

templates = {}
while True:
with open(named_pipe_filename, "rb") as fp:
log.info(f"Opened named pipe {named_pipe_filename}")
Expand All @@ -42,12 +56,28 @@ def process_named_pipe(named_pipe_filename):
break

try:
write_record(json.loads(line))
data_b64, ts, client = json.loads(line)
data = base64.b64decode(data_b64)

try:
export = parse_packet(data, templates)
write_record(ts, client, export)
except UnknownNetFlowVersion:
log.warning("Unknown NetFlow version")
continue
except TemplateNotRecognized:
log.warning("Failed to decode a v9 ExportPacket, template not "
"recognized (if this happens at the start, it's ok)")
continue

except Exception as ex:
log.exception("Error writing line, skipping...")


def write_record(j):
last_record_seqs = {}


def write_record(ts, client, export):
# {
# "DST_AS": 0,
# "SRC_AS": 0,
Expand All @@ -73,30 +103,77 @@ def write_record(j):
# }
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622

client_ip, _ = client

# check for missing records:
last_record_seq = last_record_seqs.get(client_ip)
if last_record_seq is None:
log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}")
elif export.header.sequence != last_record_seq + 1:
log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped")
last_record_seqs[client_ip] = export.header.sequence

with get_db_cursor() as c:
# first save the flow record:
ts = j['ts']
log.debug(f"Received record [{j['seq']}]: {datetime.utcfromtimestamp(ts)} from {j['client']}")
c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, j['client'],))
log.debug(f"Received record [{export.header.sequence}]: {datetime.utcfromtimestamp(ts)} from {client_ip}")
c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, client_ip,))
record_db_seq = c.fetchone()[0]

# then save each of the flows within the record, but use execute_values() to perform bulk insert:
def _get_data(record_db_seq, flows):
for flow in flows:
yield (
record_db_seq,
*flow,
# flow.get('IN_BYTES'),
# flow.get('PROTOCOL'),
# flow.get('DIRECTION'),
# flow.get('L4_DST_PORT'),
# flow.get('L4_SRC_PORT'),
# flow.get('INPUT_SNMP'),
# flow.get('OUTPUT_SNMP'),
# flow.get('IPV4_DST_ADDR'),
# flow.get('IPV4_SRC_ADDR'),
)
data_iterator = _get_data(record_db_seq, j['flows'])
def _get_data(netflow_version, record_db_seq, flows):
if netflow_version == 9:
for f in flows:
yield (
record_db_seq,
# "IN_BYTES":
f.data["IN_BYTES"],
# "PROTOCOL":
f.data["PROTOCOL"],
# "DIRECTION":
f.data["DIRECTION"],
# "L4_DST_PORT":
f.data["L4_DST_PORT"],
# "L4_SRC_PORT":
f.data["L4_SRC_PORT"],
# "INPUT_SNMP":
f.data["INPUT_SNMP"],
# "OUTPUT_SNMP":
f.data["OUTPUT_SNMP"],
# "IPV4_DST_ADDR":
f.data["IPV4_DST_ADDR"],
# "IPV4_SRC_ADDR":
f.data["IPV4_SRC_ADDR"],
)
elif netflow_version == 5:
for f in flows:
yield (
record_db_seq,
# "IN_BYTES":
f.data["IN_OCTETS"],
# "PROTOCOL":
f.data["PROTO"],
# "DIRECTION":
DIRECTION_INGRESS,
# "L4_DST_PORT":
f.data["DST_PORT"],
# "L4_SRC_PORT":
f.data["SRC_PORT"],
# "INPUT_SNMP":
f.data["INPUT"],
# "OUTPUT_SNMP":
f.data["OUTPUT"],
# netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
# them back to bytes and transform them to strings:
# "IPV4_DST_ADDR":
socket.inet_ntoa(struct.pack('!I', f.data["IPV4_DST_ADDR"])),
# "IPV4_SRC_ADDR":
socket.inet_ntoa(struct.pack('!I', f.data["IPV4_SRC_ADDR"])),
)
else:
log.error(f"Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
return

data_iterator = _get_data(export.header.version, record_db_seq, export.flows)
psycopg2.extras.execute_values(
c,
f"INSERT INTO {DB_PREFIX}flows (record, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",
Expand Down

0 comments on commit 1487976

Please sign in to comment.