Skip to content

Commit

Permalink
Fix: INPUT_SNMP and OUTPUT_SNPM can (and sometimes do) take more than…
Browse files Browse the repository at this point in the history
… 2 bytes with V9 netflow
  • Loading branch information
grafolean committed May 8, 2022
1 parent e8a26b3 commit 9cd7bfe
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 13 deletions.
27 changes: 25 additions & 2 deletions dbutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,6 @@ def migration_step_6():
""" Create a new table for flows using TimescaleDB.
- we use a different name to avoid converting old data; the conversion is too slow to be done on existing
installations without manual work. For those we know about we can perform this operation manually.
- table is *not* unlogged (it is using WAL). If insert performance becomes an issue, we can change this later
using: "ALTER TABLE netflow_flows2 SET UNLOGGED;" (but at the moment we don't know that yet)
"""
with get_db_cursor() as c:
c.execute(f"""
Expand All @@ -238,3 +236,28 @@ def migration_step_6():
""")
c.execute(f"SELECT create_hypertable('{DB_PREFIX}flows2', 'ts', chunk_time_interval => INTERVAL '1 hour');")
c.execute(f'CREATE TABLE {DB_PREFIX}bot_jobs2 (job_id TEXT NOT NULL PRIMARY KEY, last_used_ts TIMESTAMP NOT NULL);')

def migration_step_7():
""" Change input_snmp/output_snmp from SMALLINT to BIGINT (it is 2 bytes by default, but is sometimes more)
- since this is temporary data anyway, we drop and recreate the table
- table is *not* unlogged (it is using WAL). If insert performance becomes an issue, we can change this later
using: "ALTER TABLE netflow_flows2 SET UNLOGGED;" (but at the moment we don't know that yet)
"""
with get_db_cursor() as c:
c.execute(f"DROP TABLE {DB_PREFIX}flows2;")
c.execute(f"""
CREATE TABLE {DB_PREFIX}flows2 (
ts TIMESTAMP NOT NULL,
client_ip INET NOT NULL,
in_bytes BIGINT NOT NULL,
protocol SMALLINT NOT NULL,
direction SMALLINT NOT NULL,
l4_dst_port INTEGER NOT NULL,
l4_src_port INTEGER NOT NULL,
input_snmp BIGINT NOT NULL,
output_snmp BIGINT NOT NULL,
ipvX_dst_addr INET NOT NULL,
ipvX_src_addr INET NOT NULL
);
""")
c.execute(f"SELECT create_hypertable('{DB_PREFIX}flows2', 'ts', chunk_time_interval => INTERVAL '1 hour');")
22 changes: 11 additions & 11 deletions netflowwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ def _pgwriter_init():
return pg_writer


def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR):
buf = struct.pack('!Hiqi4s4siQiHiHiIiIiHiH',
def _pgwriter_encode(ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR):
buf = struct.pack('!Hiqi4s4siQiHiHiIiIiQiQ',
11, # number of columns
8, int(1000000 * (ts - PG_EPOCH_TIMESTAMP)), # https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c_source.html#l00228
8, IPV4_ADDRESS_PREFIX, socket.inet_aton(client_ip), # 4 bytes prefix + 4 bytes IP
Expand All @@ -73,8 +73,8 @@ def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_D
2, DIRECTION,
4, L4_DST_PORT,
4, L4_SRC_PORT,
2, INPUT_SNMP,
2, OUTPUT_SNMP,
8, INPUT_SNMP,
8, OUTPUT_SNMP,
)
if address_family != socket.AF_INET6:
buf2 = struct.pack('!i4s4si4s4s',
Expand All @@ -86,7 +86,7 @@ def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_D
4 + 16, IPV6_ADDRESS_PREFIX, IPVx_DST_ADDR,
4 + 16, IPV6_ADDRESS_PREFIX, IPVx_SRC_ADDR,
)
pgwriter.write(buf + buf2)
return buf + buf2


def _pgwriter_finish(pgwriter):
Expand Down Expand Up @@ -212,7 +212,7 @@ def _get_data(buffer):
dst = socket.inet_aton(f.data["IPV4_DST_ADDR"])
src = socket.inet_aton(f.data["IPV4_SRC_ADDR"])

yield (
yield _pgwriter_encode(
ts,
client_ip,
f.data["IN_BYTES"],
Expand All @@ -226,12 +226,12 @@ def _get_data(buffer):
dst,
src,
)
except KeyError:
except:
log.exception(f"[{client_ip}] Error decoding v9 flow. Contents: {repr(f.data)}")
elif netflow_version == 5:
for f in flows:
try:
yield (
yield _pgwriter_encode(
ts,
client_ip,
# "IN_BYTES":
Expand All @@ -257,14 +257,14 @@ def _get_data(buffer):
# "IPV4_SRC_ADDR":
struct.pack('!I', f.data["IPV4_SRC_ADDR"]),
)
except KeyError:
except:
log.exception(f"[{client_ip}] Error decoding v5 flow. Contents: {repr(f.data)}")
else:
log.error(f"[{client_ip}] Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")

pgwriter = _pgwriter_init()
for data in _get_data(buffer):
_pgwriter_write(pgwriter, *data)
for encoded_data in _get_data(buffer):
pgwriter.write(encoded_data)
_pgwriter_finish(pgwriter)


Expand Down

0 comments on commit 9cd7bfe

Please sign in to comment.