Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low: clientlib: add more logging #59

Open
wants to merge 4 commits into
base: rel_2_dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 46 additions & 36 deletions clientlib/fsprotocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ FSTATIC void _fsprotocol_flush_pending_connshut(FsProtoElem* fspe);
#define AUDITFSPE(fspe) { if (fspe) _fsprotocol_auditfspe(fspe, __FUNCTION__, __LINE__); }
#define AUDITIREADY(self) {_fsprotocol_auditiready(__FUNCTION__, __LINE__, self);}

#define outq_len(fspe) (fspe)->outq->_q->length
#define is_outq_empty(fspe) (outq_len(fspe)== 0)
#define set_pending(fspe) { \
(fspe)->parent->unacked = g_list_prepend((fspe)->parent->unacked, fspe); \
}
#define reset_pending(fspe) { \
(fspe)->parent->unacked = g_list_remove((fspe)->parent->unacked, fspe); \
}
#define is_pending(fspe) (g_list_find((fspe)->parent->unacked, fspe) != NULL)
#define update_nextxmit_time(fspe) { \
(fspe)->nextrexmit = g_get_monotonic_time() + (fspe)->parent->rexmit_interval; \
}


DEBUGDECLARATIONS
/// @defgroup FsProtocol FsProtocol class
Expand Down Expand Up @@ -479,7 +492,7 @@ _fsprotocol_fsa(FsProtoElem* fspe, ///< The FSPE we're processing
fspe->finalizetimer = g_timeout_add_seconds(1+parent->acktimeout/1000000, _fsprotocol_finalizetimer, fspe);
}
// Check for possible errors in our FSA tables...
if (FSPR_NONE == nextstate && curstate != nextstate && fspe->outq->_q->length != 0) {
if (FSPR_NONE == nextstate && curstate != nextstate && !is_outq_empty(fspe)) {
char * deststr = fspe->endpoint->baseclass.toString(&fspe->endpoint->baseclass);
g_critical("%s.%d: Inappropriate transition for %s to state NONE"
": (%s, %s) => %s. Actions=[%s], outq length=%d"
Expand All @@ -488,7 +501,7 @@ _fsprotocol_fsa(FsProtoElem* fspe, ///< The FSPE we're processing
, _fsprotocol_fsa_inputs(input)
, _fsprotocol_fsa_states(nextstate)
, _fsprotocol_fsa_actions(action)
, fspe->outq->_q->length);
, outq_len(fspe));
FREE(deststr); deststr = NULL;
_fsprotocol_fsa_log_history(fspe, curstate, nextstate, input, action);
}
Expand Down Expand Up @@ -526,25 +539,22 @@ _fsprotocol_flush_pending_connshut(FsProtoElem* fspe)
FSTATIC void
_fsprotocol_auditfspe(const FsProtoElem* self, const char * function, int lineno)
{
guint outqlen = self->outq->_q->length;
FsProtocol* parent = self->parent;
gboolean in_unackedlist = (g_list_find(parent->unacked, self) != NULL);
guint64 now = g_get_monotonic_time();

if (outqlen != 0 && !in_unackedlist) {
g_critical("%s:%d: outqlen is %d but not in unacked list"
, function, lineno, outqlen);
if (!is_outq_empty(self) && !is_pending(self)) {
g_critical("%s:%d: outqlen is %d but not in the pending list"
, function, lineno, outq_len(self));
DUMP("WARN: previous unacked warning was for this address",
&self->endpoint->baseclass, NULL);
}
if (outqlen == 0 && in_unackedlist) {
g_critical("%s:%d: outqlen is zero but it IS in the unacked list"
if (is_outq_empty(self) && is_pending(self)) {
g_critical("%s:%d: outqlen is zero but it IS in the pending list"
, function, lineno);
DUMP("WARN: previous unacked warning was for this address",
&self->endpoint->baseclass, NULL);
}
// If something is hung, it should start complaining soon...
if (in_unackedlist && now > (self->nextrexmit + self->parent->rexmit_interval)) {
if (!is_pending(self) && now > (self->nextrexmit + self->parent->rexmit_interval)) {
g_critical("%s:%d: Overdue retransmissions in FSPE %p", function, lineno, self);
DUMP("WARN: previous overdue warning was for this IP addr",
&self->endpoint->baseclass, NULL);
Expand Down Expand Up @@ -795,8 +805,9 @@ _fsprotocol_fspe_reinit(FsProtoElem* self)
if (!g_queue_is_empty(self->outq->_q)) {
DUMP3("REINIT OF OUTQ", &self->outq->baseclass, __FUNCTION__);
self->outq->flush(self->outq);

// No longer waiting on any ACKs - takes us off the unACKed list...
self->parent->unacked = g_list_remove(self->parent->unacked, self);
reset_pending(self)
self->outq->isready = FALSE;
}
// See the code in _fsqueue_enq and also in seqnoframe_new_init for how all these pieces
Expand Down Expand Up @@ -1160,9 +1171,9 @@ _fsprotocol_receive(FsProtocol* self ///< Self pointer
// and got a duplicate ACK
DUMP3("Received bad ACK from", &fspe->endpoint->baseclass, NULL);
DUMP3(__FUNCTION__, &fs->baseclass, " was ACK received.");
}else if (fspe->outq->_q->length == 0) {
}else if (is_outq_empty(fspe)) {
// Remove this connection from the list of connections with unacknowledged packets
fspe->parent->unacked = g_list_remove(fspe->parent->unacked, fspe);
reset_pending(fspe);
fspe->nextrexmit = 0;
TRYXMIT(fspe);
fspe->acktimeout = 0;
Expand Down Expand Up @@ -1284,13 +1295,14 @@ _fsprotocol_send1(FsProtocol* self ///< Our object
DEBUGMSG3("%s.%d: calling fsprotocol_fsa(FSPROTO_REQSEND)", __FUNCTION__, __LINE__);
_fsprotocol_fsa(fspe, FSPROTO_REQSEND, NULL);

if (fspe->outq->_q->length == 0) {
///@todo: This might be slow if we send a lot of packets to an endpoint
///< before getting a response, but that's not very likely.
if (is_outq_empty(fspe)) {
guint64 now = g_get_monotonic_time();
// Add this channel to the list of channels that need ACKs
fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe);
fspe->nextrexmit = now + self->rexmit_interval;
///@todo: This might be slow if we send a lot of packets to an endpoint
/// before getting a response, but that's not very likely.
// Add this channel to the list of channels that need ACKs
set_pending(fspe);
update_nextxmit_time(fspe);

fspe->acktimeout = now + self->acktimeout;
}
DEBUGMSG4("%s.%d: calling fspe->outq->enq()", __FUNCTION__, __LINE__);
Expand Down Expand Up @@ -1416,28 +1428,27 @@ FSTATIC void
_fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to operate on
{
GList* qelem;
FsQueue* outq;
FsProtocol* parent;
SeqnoFrame* lastseq;
NetIO* io;
guint orig_outstanding;
guint orig_pending;
gint64 now;

g_return_if_fail(fspe != NULL);
outq = fspe->outq;
parent = fspe->parent;
lastseq = fspe->lastseqsent;
io = parent->io;
orig_outstanding = fspe->outq->_q->length;
orig_pending = outq_len(fspe);
now = g_get_monotonic_time();

AUDITFSPE(fspe);
// Look for any new packets that might have showed up to send
// Check to see if we've exceeded our window size...
if (fspe->outq->_q->length < parent->window_size) {
if (outq_len(fspe) < parent->window_size) {
// Nope. Look for packets that we haven't yet sent.
// This code is sub-optimal when congestion occurs and we have a larger
// window size (i.e. when we have a number of un-ACKed packets)
for (qelem=outq->_q->head; NULL != qelem; qelem=qelem->next) {
for (qelem=fspe->outq->_q->head; NULL != qelem; qelem=qelem->next) {
FrameSet* fs = CASTTOCLASS(FrameSet, qelem->data);
SeqnoFrame* seq = fs->getseqno(fs);
if (NULL != lastseq && NULL != seq && seq->compare(seq, lastseq) <= 0) {
Expand All @@ -1458,25 +1469,24 @@ _fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to
}
lastseq = fspe->lastseqsent = seq;
REF2(lastseq);
if (fspe->outq->_q->length >= parent->window_size) {
if (outq_len(fspe) >= parent->window_size) {
// Can't send any more on this channel until we get some ACKs.
break;
}
}
update_nextxmit_time(fspe);
}
AUDITFSPE(fspe);
now = g_get_monotonic_time();

if (fspe->nextrexmit == 0 && fspe->outq->_q->length > 0) {
if (fspe->nextrexmit == 0 && !is_outq_empty(fspe)) {
// Next retransmission time not yet set...
fspe->nextrexmit = now + parent->rexmit_interval;
update_nextxmit_time(fspe);
AUDITFSPE(fspe);
} else if (fspe->nextrexmit != 0 && now > fspe->nextrexmit) {
FrameSet* fs = fspe->outq->qhead(fspe->outq);
// It's time to retransmit something. Hurray!
FrameSet* fs = outq->qhead(outq);
if (NULL != fs) {
// Update next retransmission time...
fspe->nextrexmit = now + parent->rexmit_interval;
update_nextxmit_time(fspe);
DUMP3(__FUNCTION__, &fspe->endpoint->baseclass, " Retransmission target");
DUMP3(__FUNCTION__, &fs->baseclass, " is frameset being REsent");
io->sendaframeset(io, fspe->endpoint, fs);
Expand All @@ -1494,9 +1504,9 @@ _fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to
}

// Make sure we remember to check this periodicially for retransmits...
if (orig_outstanding == 0 && fspe->outq->_q->length > 0) {
// Put this connection on the list of connections with unacked packets
fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe);
if (orig_pending == 0 && !is_outq_empty(fspe)) {
// Put 'fspe' on the list of fspe's with unacked packets
set_pending(fspe);
// See comment in the _send function regarding eventual efficiency concerns
}
AUDITFSPE(fspe);
Expand Down
6 changes: 6 additions & 0 deletions clientlib/fsqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ _fsqueue_enq(FsQueue* self ///< us - the FsQueue we're operating on
frameset_prepend_frame(fs, &seqno->baseclass);
// And put this FrameSet at the end of the queue
g_queue_push_tail(self->_q, fs);
if (DEBUG >= 3) {
char *destaddr = self->_destaddr->baseclass.toString(&self->_destaddr->baseclass);
DEBUGMSG("%s.%d: queued frameset fstype=%d seqno="FMT_64BIT"d (dest=%s)", __FUNCTION__, __LINE__,
fs->fstype, self->_nextseqno-1, destaddr);
g_free(destaddr); destaddr = NULL;
}

// Now do all the paperwork :-D
// We need for the FrameSet to be kept around for potentially a long time...
Expand Down
6 changes: 6 additions & 0 deletions clientlib/packetdecoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ _decode_packet_framedata_to_frameobject(PacketDecoder* self, ///<[in/out] Packet
ret = unknownframe_tlvconstructor(*pktstart, *pktend, newpacket, &newpacketend);
}
if (NULL == ret) {
g_warning("%s.%d: tlv contructor for frametype %d failed"
, __FUNCTION__, __LINE__, frametype);
return NULL;
}
g_return_val_if_fail(ret != NULL, NULL);
Expand Down Expand Up @@ -247,6 +249,8 @@ _pktdata_to_framesetlist(PacketDecoder*self, ///<[in] PacketDecoder object
newframestart = newpacket;
}
if (NULL == newframe) {
g_warning("%s.%d: conversion from framedata to frameobject in frameset %d failed"
, __FUNCTION__, __LINE__, fs->fstype);
UNREF(fs);
goto errout;
}
Expand Down Expand Up @@ -275,6 +279,8 @@ _pktdata_to_framesetlist(PacketDecoder*self, ///<[in] PacketDecoder object
}
if (fs) {
ret = g_slist_append(ret, fs); fs = NULL;
} else {
g_warning("%s.%d: no frameset found", __FUNCTION__, __LINE__);
}
curframeset = nextframeset;
}
Expand Down