Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Enhancements (pending streams on CGNAT scale, ...) #271

Merged
merged 7 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions code/bngblaster/src/bbl_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -1228,21 +1228,20 @@ bbl_stats_json(bbl_stats_s * stats)
void
bbl_compute_avg_rate(bbl_rate_s *rate, uint64_t current_value)
{
uint8_t idx;
uint64_t sum;
if (current_value == 0) return;

if(current_value == 0) return;
uint64_t diff = current_value - rate->last_value;
uint64_t old_diff = rate->diff_value[rate->cursor];

rate->diff_value[rate->cursor] = current_value - rate->last_value;
rate->diff_value[rate->cursor] = diff;
rate->cursor = (rate->cursor + 1) % BBL_AVG_SAMPLES;

sum = 0;
for(idx = 0; idx < BBL_AVG_SAMPLES; idx++) {
sum += rate->diff_value[idx];
}
rate->avg = sum / BBL_AVG_SAMPLES;
if(rate->avg > rate->avg_max) {
rate->sum = rate->sum - old_diff + diff;
rate->avg = rate->sum / BBL_AVG_SAMPLES;

if (rate->avg > rate->avg_max) {
rate->avg_max = rate->avg;
}
rate->cursor = (rate->cursor + 1) % BBL_AVG_SAMPLES;

rate->last_value = current_value;
}
1 change: 1 addition & 0 deletions code/bngblaster/src/bbl_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ typedef struct bbl_rate_
uint64_t diff_value[BBL_AVG_SAMPLES];
uint32_t cursor;
uint64_t last_value;
uint64_t sum;
uint64_t avg;
uint64_t avg_max;
} bbl_rate_s;
Expand Down
33 changes: 20 additions & 13 deletions code/bngblaster/src/bbl_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,7 @@ bbl_stream_ctrl(bbl_stream_s *stream)
bbl_session_s *session = stream->session;

uint64_t packets;
uint64_t loss;
uint64_t packets_delta;
uint64_t bytes_delta;
uint64_t loss_delta;
Expand All @@ -1239,8 +1240,8 @@ bbl_stream_ctrl(bbl_stream_s *stream)
stream->last_sync_packets_tx = packets;
bbl_stream_tx_stats(stream, packets_delta, bytes_delta);
}
if(g_ctx->config.stream_rate_calc) {
bbl_compute_avg_rate(&stream->rate_packets_tx, stream->tx_packets);
if(g_ctx->config.stream_rate_calc && stream->pps >= 1) {
bbl_compute_avg_rate(&stream->rate_packets_tx, packets);
}
if(unlikely(stream->type == BBL_TYPE_MULTICAST)) {
return;
Expand All @@ -1252,9 +1253,9 @@ bbl_stream_ctrl(bbl_stream_s *stream)
bytes_delta = packets_delta * stream->rx_len;
stream->last_sync_packets_rx = packets;
/* Calculate RX loss since last sync. */
packets = stream->rx_loss;
loss_delta = packets - stream->last_sync_loss;
stream->last_sync_loss = packets;
loss = stream->rx_loss;
loss_delta = loss - stream->last_sync_loss;
stream->last_sync_loss = loss;
bbl_stream_rx_stats(stream, packets_delta, bytes_delta, loss_delta);
if(unlikely(stream->rx_wrong_session)) {
bbl_stream_rx_wrong_session(stream);
Expand Down Expand Up @@ -1287,8 +1288,8 @@ bbl_stream_ctrl(bbl_stream_s *stream)
}
}
}
if(g_ctx->config.stream_rate_calc) {
bbl_compute_avg_rate(&stream->rate_packets_rx, stream->rx_packets);
if(g_ctx->config.stream_rate_calc && stream->pps >= 1) {
bbl_compute_avg_rate(&stream->rate_packets_rx, packets);
}
}

Expand Down Expand Up @@ -1501,25 +1502,31 @@ bbl_stream_io_send_iter(io_handle_s *io, uint64_t now)
{
io_bucket_s *io_bucket = io->bucket_cur;
bbl_stream_s *stream;
uint64_t min = now - 100000000; /* now minus 100ms */
uint64_t min = now - 100 * MSEC; /* now minus 100ms */
uint64_t expired;
while(io_bucket) {
if(io_bucket->stream_cur) {
stream = io_bucket->stream_cur;
} else {
stream = io_bucket->stream_head;
io_bucket->stream_cur = stream;
io_bucket->base += io_bucket->nsec;
if(io_bucket->base < min) {
io_bucket->base = min;
} else if(io_bucket->base > now) {
io_bucket->base = now;
}
}
if(io_bucket->base >= now) {
/* next bucket */
io_bucket = io_bucket->next;
if(!io_bucket) io_bucket = io->bucket_head;
if(io_bucket == io->bucket_cur) return NULL;
continue;
}
expired = now - io_bucket->base;
while(stream) {
if(stream->expired > expired) {
io_bucket->stream_cur = stream;
goto NEXT_BUCKET;
break;
}
if (bbl_stream_io_send(stream) == PROTOCOL_SUCCESS) {
io_bucket->stream_cur = stream->io_next;
Expand All @@ -1528,8 +1535,8 @@ bbl_stream_io_send_iter(io_handle_s *io, uint64_t now)
}
stream = stream->io_next;
}
io_bucket->stream_cur = NULL;
NEXT_BUCKET:
if(!stream) io_bucket->stream_cur = NULL;
/* next bucket */
io_bucket = io_bucket->next;
if(!io_bucket) io_bucket = io->bucket_head;
if(io_bucket == io->bucket_cur) return NULL;
Expand Down
1 change: 0 additions & 1 deletion code/bngblaster/src/io/io_dpdk.c
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,6 @@ io_dpdk_interface_init(bbl_interface_s *interface)
} else {
timer_add_periodic(&g_ctx->timer_root, &interface->io.tx_job, "TX", 0,
config->tx_interval, io, &io_dpdk_tx_job);
interface->io.tx_job->reset = false;
}
io->queue = queue;
if(!io_dpdk_add_mbuf_pool(io)) {
Expand Down
1 change: 0 additions & 1 deletion code/bngblaster/src/io/io_packet_mmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ io_packet_mmap_init(io_handle_s *io)
} else {
timer_add_periodic(&g_ctx->timer_root, &interface->io.tx_job, "TX", 0,
config->tx_interval, io, &io_packet_mmap_tx_job);
interface->io.tx_job->reset = false;
}
}
return true;
Expand Down
3 changes: 2 additions & 1 deletion code/bngblaster/src/io/io_raw.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ io_raw_tx_job(timer_s *timer)
} else {
LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n",
interface->name, strerror(errno), errno);
io->bucket_cur->stream_cur = stream;
io->stats.io_errors++;
burst = 0;
}
Expand Down Expand Up @@ -280,6 +281,7 @@ io_raw_thread_tx_run_fn(io_thread_s *thread)
} else {
LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n",
io->interface->name, strerror(errno), errno);
io->bucket_cur->stream_cur = stream;
io->stats.io_errors++;
burst = 0;
}
Expand Down Expand Up @@ -316,7 +318,6 @@ io_raw_init(io_handle_s *io)
} else {
timer_add_periodic(&g_ctx->timer_root, &interface->io.tx_job, "TX", 0,
config->tx_interval, io, &io_raw_tx_job);
interface->io.tx_job->reset = false;
}
}
return true;
Expand Down
25 changes: 24 additions & 1 deletion code/bngblaster/src/io/io_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,32 @@ bucket_stream_add(io_bucket_s *io_bucket, bbl_stream_s *stream)
{
stream->io_next = io_bucket->stream_head;
io_bucket->stream_head = stream;
io_bucket->stream_cur = stream;
io_bucket->stream_count++;
}

static void
bucket_shuffle(io_bucket_s *io_bucket)
{
bbl_stream_s *stream;
bbl_stream_s *next;

if(io_bucket && io_bucket->stream_count) {
stream = io_bucket->stream_head;
while(stream) {
next = stream->io_next;
if(next && next->flow_id%3==0) {
stream->io_next = next->io_next;
stream = next->io_next;
next->io_next = io_bucket->stream_head;
io_bucket->stream_head = next;
} else {
stream = next;
}
}
}
io_bucket->stream_cur = NULL;
}

static void
bucket_smear(io_bucket_s *io_bucket, uint64_t start_nsec)
{
Expand Down Expand Up @@ -108,6 +130,7 @@ io_stream_smear(io_handle_s *io)
clock_gettime(CLOCK_MONOTONIC, &now);
now_nsec = timespec_to_nsec(&now);
while(io_bucket) {
bucket_shuffle(io_bucket);
bucket_smear(io_bucket, now_nsec);
io_bucket = io_bucket->next;
}
Expand Down
Loading