diff --git a/dbutils.py b/dbutils.py index c4d7cbb..27ce4c3 100644 --- a/dbutils.py +++ b/dbutils.py @@ -217,8 +217,6 @@ def migration_step_6(): """ Create a new table for flows using TimescaleDB. - we use a different name to avoid converting old data; the conversion is too slow to be done on existing installations without manual work. For those we know about we can perform this operation manually. - - table is *not* unlogged (it is using WAL). If insert performance becomes an issue, we can change this later - using: "ALTER TABLE netflow_flows2 SET UNLOGGED;" (but at the moment we don't know that yet) """ with get_db_cursor() as c: c.execute(f""" @@ -238,3 +236,28 @@ def migration_step_6(): """) c.execute(f"SELECT create_hypertable('{DB_PREFIX}flows2', 'ts', chunk_time_interval => INTERVAL '1 hour');") c.execute(f'CREATE TABLE {DB_PREFIX}bot_jobs2 (job_id TEXT NOT NULL PRIMARY KEY, last_used_ts TIMESTAMP NOT NULL);') + +def migration_step_7(): + """ Change input_snmp/output_snmp from SMALLINT to BIGINT (it is 2 bytes by default, but is sometimes more) + - since this is temporary data anyway, we drop and recreate the table + - table is *not* unlogged (it is using WAL). If insert performance becomes an issue, we can change this later + using: "ALTER TABLE netflow_flows2 SET UNLOGGED;" (but at the moment we don't know that yet) + """ + with get_db_cursor() as c: + c.execute(f"DROP TABLE {DB_PREFIX}flows2;") + c.execute(f""" + CREATE TABLE {DB_PREFIX}flows2 ( + ts TIMESTAMP NOT NULL, + client_ip INET NOT NULL, + in_bytes BIGINT NOT NULL, + protocol SMALLINT NOT NULL, + direction SMALLINT NOT NULL, + l4_dst_port INTEGER NOT NULL, + l4_src_port INTEGER NOT NULL, + input_snmp BIGINT NOT NULL, + output_snmp BIGINT NOT NULL, + ipvX_dst_addr INET NOT NULL, + ipvX_src_addr INET NOT NULL + ); + """) + c.execute(f"SELECT create_hypertable('{DB_PREFIX}flows2', 'ts', chunk_time_interval => INTERVAL '1 hour');") diff --git a/netflowwriter.py b/netflowwriter.py index 1907824..467396c 100644 --- a/netflowwriter.py +++ b/netflowwriter.py @@ -63,8 +63,8 @@ def _pgwriter_init(): return pg_writer -def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR): - buf = struct.pack('!Hiqi4s4siQiHiHiIiIiHiH', +def _pgwriter_encode(ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR): + buf = struct.pack('!Hiqi4s4siQiHiHiIiIiQiQ', 11, # number of columns 8, int(1000000 * (ts - PG_EPOCH_TIMESTAMP)), # https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c_source.html#l00228 8, IPV4_ADDRESS_PREFIX, socket.inet_aton(client_ip), # 4 bytes prefix + 4 bytes IP @@ -73,8 +73,8 @@ def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_D 2, DIRECTION, 4, L4_DST_PORT, 4, L4_SRC_PORT, - 2, INPUT_SNMP, - 2, OUTPUT_SNMP, + 8, INPUT_SNMP, + 8, OUTPUT_SNMP, ) if address_family != socket.AF_INET6: buf2 = struct.pack('!i4s4si4s4s', @@ -86,7 +86,7 @@ def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_D 4 + 16, IPV6_ADDRESS_PREFIX, IPVx_DST_ADDR, 4 + 16, IPV6_ADDRESS_PREFIX, IPVx_SRC_ADDR, ) - pgwriter.write(buf + buf2) + return buf + buf2 def _pgwriter_finish(pgwriter): @@ -212,7 +212,7 @@ def _get_data(buffer): dst = socket.inet_aton(f.data["IPV4_DST_ADDR"]) src = socket.inet_aton(f.data["IPV4_SRC_ADDR"]) - yield ( + yield _pgwriter_encode( ts, client_ip, f.data["IN_BYTES"], @@ -226,12 +226,12 @@ def _get_data(buffer): dst, src, ) - except KeyError: + except: log.exception(f"[{client_ip}] Error decoding v9 flow. Contents: {repr(f.data)}") elif netflow_version == 5: for f in flows: try: - yield ( + yield _pgwriter_encode( ts, client_ip, # "IN_BYTES": @@ -257,14 +257,14 @@ def _get_data(buffer): # "IPV4_SRC_ADDR": struct.pack('!I', f.data["IPV4_SRC_ADDR"]), ) - except KeyError: + except: log.exception(f"[{client_ip}] Error decoding v5 flow. Contents: {repr(f.data)}") else: log.error(f"[{client_ip}] Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])") pgwriter = _pgwriter_init() - for data in _get_data(buffer): - _pgwriter_write(pgwriter, *data) + for encoded_data in _get_data(buffer): + pgwriter.write(encoded_data) _pgwriter_finish(pgwriter)