Skip to content

Commit

Permalink
Systematic testing enhancements
Browse files Browse the repository at this point in the history
Update SYSTEMATIC_TESTING_PRINTF to take a level argument.
Add cli argument for controlling output level of systematic testing printf's.
Add SYSTEMATIC_TESTING_YIELD and SYSTEMATIC_TESTING_PRINTF in a lot more places.
  • Loading branch information
dipinhora committed Jul 16, 2022
1 parent 2c4f404 commit 44c3b22
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 23 deletions.
45 changes: 45 additions & 0 deletions src/libponyrt/actor/actor.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#define PONY_WANT_ATOMIC_DEFS

#include "actor.h"
#include "../sched/systematic_testing.h"
#include "../sched/scheduler.h"
#include "../sched/cpu.h"
#include "../mem/pool.h"
Expand Down Expand Up @@ -57,12 +58,14 @@ static bool has_sync_flag(pony_actor_t* actor, uint8_t flag)
static void set_sync_flag(pony_actor_t* actor, uint8_t flag)
{
uint8_t flags = atomic_load_explicit(&actor->sync_flags, memory_order_acquire);
SYSTEMATIC_TESTING_YIELD();
atomic_store_explicit(&actor->sync_flags, flags | flag, memory_order_release);
}

static void unset_sync_flag(pony_actor_t* actor, uint8_t flag)
{
uint8_t flags = atomic_load_explicit(&actor->sync_flags, memory_order_acquire);
SYSTEMATIC_TESTING_YIELD();
atomic_store_explicit(&actor->sync_flags, flags & (uint8_t)~flag,
memory_order_release);
}
Expand Down Expand Up @@ -109,12 +112,14 @@ static void unset_internal_flag(pony_actor_t* actor, uint8_t flag)

static void mute_actor(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu muted...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
set_sync_flag(actor, SYNC_FLAG_MUTED);
DTRACE1(ACTOR_MUTED, (uintptr_t)actor);
}

void ponyint_unmute_actor(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu unmuted...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
unset_sync_flag(actor, SYNC_FLAG_MUTED);
DTRACE1(ACTOR_UNMUTED, (uintptr_t)actor);
}
Expand All @@ -127,6 +132,7 @@ static bool triggers_muting(pony_actor_t* actor)

static void actor_setoverloaded(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu overloaded...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
set_sync_flag(actor, SYNC_FLAG_OVERLOADED);
DTRACE1(ACTOR_OVERLOADED, (uintptr_t)actor);
Expand All @@ -135,6 +141,7 @@ static void actor_setoverloaded(pony_actor_t* actor)
static void actor_unsetoverloaded(pony_actor_t* actor)
{
pony_ctx_t* ctx = pony_ctx();
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu unoverloaded...\n", ctx->scheduler->tid, (uintptr_t)actor);
unset_sync_flag(actor, SYNC_FLAG_OVERLOADED);
DTRACE1(ACTOR_OVERLOADED_CLEARED, (uintptr_t)actor);
if (!has_sync_flag(actor, SYNC_FLAG_UNDER_PRESSURE))
Expand Down Expand Up @@ -223,6 +230,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_ACQUIRE...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
pony_msgp_t* m = (pony_msgp_t*)msg;

Expand Down Expand Up @@ -250,6 +258,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_RELEASE...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
pony_msgp_t* m = (pony_msgp_t*)msg;

Expand Down Expand Up @@ -277,6 +286,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_ACK...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -290,6 +300,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_CONF...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
if(has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
Expand All @@ -308,6 +319,8 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msg_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_ISBLOCKED...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);

// this actor should not already be marked as pendingdestroy
// or else we could end up double freeing it
// this assertion is to ensure this invariant is not
Expand Down Expand Up @@ -337,6 +350,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
{
// memtrack messages tracked in cycle detector

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_BLOCK...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -350,6 +364,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_UNBLOCK...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -363,6 +378,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_DESTROYED...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -376,6 +392,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msg_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_CHECKBLOCKED...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -389,6 +406,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->mem_allocated_messages -= POOL_SIZE(msg->index);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled APP message...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
if(has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
Expand All @@ -408,6 +426,7 @@ static void try_gc(pony_ctx_t* ctx, pony_actor_t* actor)
if(!ponyint_heap_startgc(&actor->heap))
return;

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: GC'ing actor %lu...\n", ctx->scheduler->tid, (uintptr_t)actor);
DTRACE1(GC_START, (uintptr_t)ctx->scheduler);

ponyint_gc_mark(ctx);
Expand All @@ -418,6 +437,7 @@ static void try_gc(pony_ctx_t* ctx, pony_actor_t* actor)
ponyint_mark_done(ctx);
ponyint_heap_endgc(&actor->heap);

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: finished GC'ing actor %lu...\n", ctx->scheduler->tid, (uintptr_t)actor);
DTRACE1(GC_END, (uintptr_t)ctx->scheduler);
}

Expand Down Expand Up @@ -462,6 +482,8 @@ static bool batch_limit_reached(pony_actor_t* actor, bool polling)
actor_setoverloaded(actor);
}

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu reached batch limit...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);

return true;
}

Expand All @@ -483,10 +505,12 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
#endif
)) != NULL)
{
SYSTEMATIC_TESTING_YIELD();
if(handle_message(ctx, actor, msg))
{
// If we handle an application message, try to gc.
app++;
SYSTEMATIC_TESTING_YIELD();
try_gc(ctx, actor);

// maybe mute actor; returns true if mute occurs
Expand All @@ -502,7 +526,10 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// Stop handling a batch if we reach the head we found when we were
// scheduled.
if(msg == head)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled all messages...\n", ctx->scheduler->tid, (uintptr_t)actor);
break;
}
}

// We didn't hit our app message batch limit. We now believe our queue to be
Expand Down Expand Up @@ -544,6 +571,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
set_internal_flag(actor, FLAG_BLOCKED_SENT);
set_internal_flag(actor, FLAG_CD_CONTACTED);
ponyint_cycle_block(actor, &actor->gc);
SYSTEMATIC_TESTING_YIELD();
}

}
Expand All @@ -560,6 +588,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// check if the actors queue is empty or not
if(ponyint_messageq_isempty(&actor->q))
{
SYSTEMATIC_TESTING_YIELD();
// The actors queue is empty which means this actor is a zombie
// and can be reaped.
// If the cycle detector is disabled (or it doesn't know about this actor),
Expand Down Expand Up @@ -592,6 +621,8 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// make sure the queue is actually empty as expected
pony_assert(empty);

SYSTEMATIC_TESTING_YIELD();

// "Locally delete" the actor.
ponyint_actor_setpendingdestroy(actor);
ponyint_actor_final(ctx, actor);
Expand All @@ -613,6 +644,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// and the cycle detector didn't send a message in the meantime
if(!ponyint_messageq_isempty(&actor->q))
{
SYSTEMATIC_TESTING_YIELD();
// need to undo set pending destroy because the cycle detector
// sent the actor a message and it is not safe to destroy this
// actor as a result.
Expand Down Expand Up @@ -656,9 +688,11 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// messages (which would also create a race condition).
set_internal_flag(actor, FLAG_BLOCKED_SENT);
ponyint_cycle_block(actor, &actor->gc);
SYSTEMATIC_TESTING_YIELD();

// mark the queue as empty or else destroy will hang
bool empty = ponyint_messageq_markempty(&actor->q);
SYSTEMATIC_TESTING_YIELD();

// make sure the queue is actually empty as expected
pony_assert(empty);
Expand All @@ -678,6 +712,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)

void ponyint_actor_destroy(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu destroyed...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(has_sync_flag(actor, SYNC_FLAG_PENDINGDESTROY));

// Make sure the actor being destroyed has finished marking its queue
Expand Down Expand Up @@ -782,6 +817,8 @@ PONY_API pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type,
memset(actor, 0, type->size);
actor->type = type;

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu created...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);

#ifdef USE_MEMTRACK
ctx->mem_used_actors += type->size;
ctx->mem_allocated_actors += ponyint_pool_used_size(type->size);
Expand Down Expand Up @@ -879,12 +916,15 @@ PONY_API void pony_sendv(pony_ctx_t* ctx, pony_actor_t* to, pony_msg_t* first,
if(has_app_msg)
maybe_mark_should_mute(ctx, to);

SYSTEMATIC_TESTING_YIELD();

if(ponyint_actor_messageq_push(&to->q, first, last
#ifdef USE_DYNAMIC_TRACE
, ctx->scheduler, ctx->current, to
#endif
))
{
SYSTEMATIC_TESTING_YIELD();
if(!has_sync_flag(to, SYNC_FLAG_MUTED))
{
ponyint_sched_add(ctx, to);
Expand Down Expand Up @@ -922,12 +962,15 @@ PONY_API void pony_sendv_single(pony_ctx_t* ctx, pony_actor_t* to,
if(has_app_msg)
maybe_mark_should_mute(ctx, to);

SYSTEMATIC_TESTING_YIELD();

if(ponyint_actor_messageq_push_single(&to->q, first, last
#ifdef USE_DYNAMIC_TRACE
, ctx->scheduler, ctx->current, to
#endif
))
{
SYSTEMATIC_TESTING_YIELD();
if(!has_sync_flag(to, SYNC_FLAG_MUTED))
{
// if the receiving actor is currently not unscheduled AND it's not
Expand Down Expand Up @@ -1068,13 +1111,15 @@ PONY_API void pony_poll(pony_ctx_t* ctx)
PONY_API void pony_apply_backpressure()
{
pony_ctx_t* ctx = pony_ctx();
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu under pressure...\n", ctx->scheduler->tid, (uintptr_t)ctx->current);
set_sync_flag(ctx->current, SYNC_FLAG_UNDER_PRESSURE);
DTRACE1(ACTOR_UNDER_PRESSURE, (uintptr_t)ctx->current);
}

PONY_API void pony_release_backpressure()
{
pony_ctx_t* ctx = pony_ctx();
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu not under pressure...\n", ctx->scheduler->tid, (uintptr_t)ctx->current);
unset_sync_flag(ctx->current, SYNC_FLAG_UNDER_PRESSURE);
DTRACE1(ACTOR_PRESSURE_RELEASED, (uintptr_t)ctx->current);
if (!has_sync_flag(ctx->current, SYNC_FLAG_OVERLOADED))
Expand Down
Loading

0 comments on commit 44c3b22

Please sign in to comment.