diff --git a/code/bngblaster/src/bbl_stats.c b/code/bngblaster/src/bbl_stats.c index f3085943..4a505837 100644 --- a/code/bngblaster/src/bbl_stats.c +++ b/code/bngblaster/src/bbl_stats.c @@ -1228,21 +1228,20 @@ bbl_stats_json(bbl_stats_s * stats) void bbl_compute_avg_rate(bbl_rate_s *rate, uint64_t current_value) { - uint8_t idx; - uint64_t sum; + if (current_value == 0) return; - if(current_value == 0) return; + uint64_t diff = current_value - rate->last_value; + uint64_t old_diff = rate->diff_value[rate->cursor]; - rate->diff_value[rate->cursor] = current_value - rate->last_value; + rate->diff_value[rate->cursor] = diff; + rate->cursor = (rate->cursor + 1) % BBL_AVG_SAMPLES; - sum = 0; - for(idx = 0; idx < BBL_AVG_SAMPLES; idx++) { - sum += rate->diff_value[idx]; - } - rate->avg = sum / BBL_AVG_SAMPLES; - if(rate->avg > rate->avg_max) { + rate->sum = rate->sum - old_diff + diff; + rate->avg = rate->sum / BBL_AVG_SAMPLES; + + if (rate->avg > rate->avg_max) { rate->avg_max = rate->avg; } - rate->cursor = (rate->cursor + 1) % BBL_AVG_SAMPLES; + rate->last_value = current_value; } \ No newline at end of file diff --git a/code/bngblaster/src/bbl_stats.h b/code/bngblaster/src/bbl_stats.h index f98c49a3..7375a9ce 100644 --- a/code/bngblaster/src/bbl_stats.h +++ b/code/bngblaster/src/bbl_stats.h @@ -16,6 +16,7 @@ typedef struct bbl_rate_ uint64_t diff_value[BBL_AVG_SAMPLES]; uint32_t cursor; uint64_t last_value; + uint64_t sum; uint64_t avg; uint64_t avg_max; } bbl_rate_s; diff --git a/code/bngblaster/src/bbl_stream.c b/code/bngblaster/src/bbl_stream.c index 27dad433..689694b7 100644 --- a/code/bngblaster/src/bbl_stream.c +++ b/code/bngblaster/src/bbl_stream.c @@ -1227,6 +1227,7 @@ bbl_stream_ctrl(bbl_stream_s *stream) bbl_session_s *session = stream->session; uint64_t packets; + uint64_t loss; uint64_t packets_delta; uint64_t bytes_delta; uint64_t loss_delta; @@ -1239,8 +1240,8 @@ bbl_stream_ctrl(bbl_stream_s *stream) stream->last_sync_packets_tx = packets; bbl_stream_tx_stats(stream, packets_delta, bytes_delta); } - if(g_ctx->config.stream_rate_calc) { - bbl_compute_avg_rate(&stream->rate_packets_tx, stream->tx_packets); + if(g_ctx->config.stream_rate_calc && stream->pps >= 1) { + bbl_compute_avg_rate(&stream->rate_packets_tx, packets); } if(unlikely(stream->type == BBL_TYPE_MULTICAST)) { return; @@ -1252,9 +1253,9 @@ bbl_stream_ctrl(bbl_stream_s *stream) bytes_delta = packets_delta * stream->rx_len; stream->last_sync_packets_rx = packets; /* Calculate RX loss since last sync. */ - packets = stream->rx_loss; - loss_delta = packets - stream->last_sync_loss; - stream->last_sync_loss = packets; + loss = stream->rx_loss; + loss_delta = loss - stream->last_sync_loss; + stream->last_sync_loss = loss; bbl_stream_rx_stats(stream, packets_delta, bytes_delta, loss_delta); if(unlikely(stream->rx_wrong_session)) { bbl_stream_rx_wrong_session(stream); @@ -1287,8 +1288,8 @@ bbl_stream_ctrl(bbl_stream_s *stream) } } } - if(g_ctx->config.stream_rate_calc) { - bbl_compute_avg_rate(&stream->rate_packets_rx, stream->rx_packets); + if(g_ctx->config.stream_rate_calc && stream->pps >= 1) { + bbl_compute_avg_rate(&stream->rate_packets_rx, packets); } } @@ -1501,25 +1502,31 @@ bbl_stream_io_send_iter(io_handle_s *io, uint64_t now) { io_bucket_s *io_bucket = io->bucket_cur; bbl_stream_s *stream; - uint64_t min = now - 100000000; /* now minus 100ms */ + uint64_t min = now - 100 * MSEC; /* now minus 100ms */ uint64_t expired; while(io_bucket) { if(io_bucket->stream_cur) { stream = io_bucket->stream_cur; } else { stream = io_bucket->stream_head; + io_bucket->stream_cur = stream; io_bucket->base += io_bucket->nsec; if(io_bucket->base < min) { io_bucket->base = min; - } else if(io_bucket->base > now) { - io_bucket->base = now; } } + if(io_bucket->base >= now) { + /* next bucket */ + io_bucket = io_bucket->next; + if(!io_bucket) io_bucket = io->bucket_head; + if(io_bucket == io->bucket_cur) return NULL; + continue; + } expired = now - io_bucket->base; while(stream) { if(stream->expired > expired) { io_bucket->stream_cur = stream; - goto NEXT_BUCKET; + break; } if (bbl_stream_io_send(stream) == PROTOCOL_SUCCESS) { io_bucket->stream_cur = stream->io_next; @@ -1528,8 +1535,8 @@ bbl_stream_io_send_iter(io_handle_s *io, uint64_t now) } stream = stream->io_next; } - io_bucket->stream_cur = NULL; -NEXT_BUCKET: + if(!stream) io_bucket->stream_cur = NULL; + /* next bucket */ io_bucket = io_bucket->next; if(!io_bucket) io_bucket = io->bucket_head; if(io_bucket == io->bucket_cur) return NULL; diff --git a/code/bngblaster/src/io/io_dpdk.c b/code/bngblaster/src/io/io_dpdk.c index cba607c7..94fce796 100644 --- a/code/bngblaster/src/io/io_dpdk.c +++ b/code/bngblaster/src/io/io_dpdk.c @@ -648,7 +648,6 @@ io_dpdk_interface_init(bbl_interface_s *interface) } else { timer_add_periodic(&g_ctx->timer_root, &interface->io.tx_job, "TX", 0, config->tx_interval, io, &io_dpdk_tx_job); - interface->io.tx_job->reset = false; } io->queue = queue; if(!io_dpdk_add_mbuf_pool(io)) { diff --git a/code/bngblaster/src/io/io_packet_mmap.c b/code/bngblaster/src/io/io_packet_mmap.c index ec29b0b3..de162511 100644 --- a/code/bngblaster/src/io/io_packet_mmap.c +++ b/code/bngblaster/src/io/io_packet_mmap.c @@ -407,7 +407,6 @@ io_packet_mmap_init(io_handle_s *io) } else { timer_add_periodic(&g_ctx->timer_root, &interface->io.tx_job, "TX", 0, config->tx_interval, io, &io_packet_mmap_tx_job); - interface->io.tx_job->reset = false; } } return true; diff --git a/code/bngblaster/src/io/io_raw.c b/code/bngblaster/src/io/io_raw.c index 6fae29aa..dfce0d6d 100644 --- a/code/bngblaster/src/io/io_raw.c +++ b/code/bngblaster/src/io/io_raw.c @@ -175,6 +175,7 @@ io_raw_tx_job(timer_s *timer) } else { LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n", interface->name, strerror(errno), errno); + io->bucket_cur->stream_cur = stream; io->stats.io_errors++; burst = 0; } @@ -280,6 +281,7 @@ io_raw_thread_tx_run_fn(io_thread_s *thread) } else { LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n", io->interface->name, strerror(errno), errno); + io->bucket_cur->stream_cur = stream; io->stats.io_errors++; burst = 0; } @@ -316,7 +318,6 @@ io_raw_init(io_handle_s *io) } else { timer_add_periodic(&g_ctx->timer_root, &interface->io.tx_job, "TX", 0, config->tx_interval, io, &io_raw_tx_job); - interface->io.tx_job->reset = false; } } return true; diff --git a/code/bngblaster/src/io/io_stream.c b/code/bngblaster/src/io/io_stream.c index 2f5896c3..d81a6ae7 100644 --- a/code/bngblaster/src/io/io_stream.c +++ b/code/bngblaster/src/io/io_stream.c @@ -40,10 +40,32 @@ bucket_stream_add(io_bucket_s *io_bucket, bbl_stream_s *stream) { stream->io_next = io_bucket->stream_head; io_bucket->stream_head = stream; - io_bucket->stream_cur = stream; io_bucket->stream_count++; } +static void +bucket_shuffle(io_bucket_s *io_bucket) +{ + bbl_stream_s *stream; + bbl_stream_s *next; + + if(io_bucket && io_bucket->stream_count) { + stream = io_bucket->stream_head; + while(stream) { + next = stream->io_next; + if(next && next->flow_id%3==0) { + stream->io_next = next->io_next; + stream = next->io_next; + next->io_next = io_bucket->stream_head; + io_bucket->stream_head = next; + } else { + stream = next; + } + } + } + io_bucket->stream_cur = NULL; +} + static void bucket_smear(io_bucket_s *io_bucket, uint64_t start_nsec) { @@ -108,6 +130,7 @@ io_stream_smear(io_handle_s *io) clock_gettime(CLOCK_MONOTONIC, &now); now_nsec = timespec_to_nsec(&now); while(io_bucket) { + bucket_shuffle(io_bucket); bucket_smear(io_bucket, now_nsec); io_bucket = io_bucket->next; }