Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize read/take path with SHM when waitset is not used for the data notification #1173

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
5 changes: 5 additions & 0 deletions src/core/ddsc/src/dds__reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ struct nn_rsample_info;
struct nn_rdata;
DDS_EXPORT void dds_reader_ddsi2direct (dds_entity_t entity, void (*cb) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg), void *cbarg);

/*
Transfer the samples from the iox subscriber queue to the reader history cache
*/
void dds_transfer_samples_from_iox_to_rhc (dds_reader * reader);

DEFINE_ENTITY_LOCK_UNLOCK(dds_reader, DDS_KIND_READER)

#if defined (__cplusplus)
Expand Down
106 changes: 106 additions & 0 deletions src/core/ddsc/src/dds_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,

thread_state_awake (ts1, &entity->m_domain->gv);

#ifdef DDS_HAS_SHM
// If SHM is supported and if the monitor is not attached
if(rd->m_iox_sub && !rd->m_iox_sub_context.monitor) {
// transfer the samples from iox to rhc in the same thread context
dds_transfer_samples_from_iox_to_rhc(rd);
}
#endif

/* Allocate samples if not provided (assuming all or none provided) */
if (buf[0] == NULL)
{
Expand Down Expand Up @@ -163,6 +171,13 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
}

thread_state_awake (ts1, &entity->m_domain->gv);
#ifdef DDS_HAS_SHM
// If SHM is supported and if the monitor is not attached
if(rd->m_iox_sub && !rd->m_iox_sub_context.monitor) {
// transfer the samples from iox to rhc in the same thread context
dds_transfer_samples_from_iox_to_rhc(rd);
}
#endif

/* read/take resets data available status -- must reset before reading because
the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */
Expand Down Expand Up @@ -554,3 +569,94 @@ dds_return_t dds_return_reader_loan (dds_reader *rd, void **buf, int32_t bufsz)
ddsrt_mutex_unlock (&rd->m_entity.m_mutex);
return DDS_RETCODE_OK;
}


void dds_transfer_samples_from_iox_to_rhc (dds_reader * rd)
{
#if DDS_HAS_SHM
void* chunk = NULL;
struct ddsi_domaingv* gv = rd->m_rd->e.gv;
thread_state_awake(lookup_thread_state(), gv);

while (true)
{
shm_lock_iox_sub(rd->m_iox_sub);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may also worth looking into whether it pays of to lock for the whole while loop (longer critical section but less lock/unlock).

enum iox_ChunkReceiveResult take_result = iox_sub_take_chunk(rd->m_iox_sub, (const void** const)&chunk);
shm_unlock_iox_sub(rd->m_iox_sub);

// NB: If we cannot take the chunk (sample) the user may lose data.
// Since the subscriber queue can overflow and will evict the least recent sample.
// This entirely depends on the producer and consumer frequency (and the queue size if they are close).
// The consumer here is essentially the reader history cache.
if (ChunkReceiveResult_SUCCESS != take_result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we may need to distinguish between various different errors like

enum iox_ChunkReceiveResult
{
    ChunkReceiveResult_TOO_MANY_CHUNKS_HELD_IN_PARALLEL,
    ChunkReceiveResult_NO_CHUNK_AVAILABLE,
    ChunkReceiveResult_UNDEFINED_ERROR,
    ChunkReceiveResult_SUCCESS,
};

to e.g. warn a user that we hold to many chunks in parallel (meaning the consumer working with the chunks is too slow and we hence may lose chunks).

I can do this in an upcoming PR which establishes compatibility with iceoryx 2.0 (cf. #1174)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have abstracted the code from shm_monitor to a separate API for this functionality, so the error handling should be good.

{
switch(take_result)
{
case ChunkReceiveResult_TOO_MANY_CHUNKS_HELD_IN_PARALLEL :
{
// we hold to many chunks and cannot get more
DDS_CLOG (DDS_LC_WARNING | DDS_LC_SHM, &rd->m_entity.m_domain->gv.logconfig,
"DDS reader with topic %s : iceoryx subscriber - TOO_MANY_CHUNKS_HELD_IN_PARALLEL -"
"could not take sample\n", rd->m_topic->m_name);
break;
}
case ChunkReceiveResult_NO_CHUNK_AVAILABLE: {
// no more chunks to take, ok
break;
}
default : {
// some unkown error occurred
DDS_CLOG(DDS_LC_WARNING | DDS_LC_SHM, &rd->m_entity.m_domain->gv.logconfig,
"DDS reader with topic %s : iceoryx subscriber - UNKNOWN ERROR -"
"could not take sample\n", rd->m_topic->m_name);
}
}

break;
}

const iceoryx_header_t* ice_hdr = iceoryx_header_from_chunk(chunk);

// Get writer or proxy writer
struct entity_common * e = entidx_lookup_guid_untyped (gv->entity_index, &ice_hdr->guid);
if (e == NULL || (e->kind != EK_PROXY_WRITER && e->kind != EK_WRITER))
{
// Ignore the sample that is not from a known writer or proxy writer
DDS_CLOG (DDS_LC_SHM, &gv->logconfig, "unknown source entity, ignore.\n");
continue;
}

// Create struct ddsi_serdata
struct ddsi_serdata* d = ddsi_serdata_from_iox(rd->m_topic->m_stype, ice_hdr->data_kind, &rd->m_iox_sub, chunk);
d->timestamp.v = ice_hdr->tstamp;
d->statusinfo = ice_hdr->statusinfo;

// Get struct ddsi_tkmap_instance
struct ddsi_tkmap_instance* tk;
if ((tk = ddsi_tkmap_lookup_instance_ref(gv->m_tkmap, d)) == NULL)
{
DDS_CLOG(DDS_LC_SHM, &gv->logconfig, "ddsi_tkmap_lookup_instance_ref failed.\n");
goto release;
}

// Generate writer_info
struct ddsi_writer_info wrinfo;
struct dds_qos *xqos;
if (e->kind == EK_PROXY_WRITER)
xqos = ((struct proxy_writer *) e)->c.xqos;
else
xqos = ((struct writer *) e)->xqos;
ddsi_make_writer_info(&wrinfo, e, xqos, d->statusinfo);
(void)ddsi_rhc_store(rd->m_rd->rhc, &wrinfo, d, tk);

release:
if (tk)
ddsi_tkmap_instance_unref(gv->m_tkmap, tk);
if (d)
ddsi_serdata_unref(d);
}
thread_state_asleep(lookup_thread_state());
#else
(void)rd;
#endif
}
24 changes: 8 additions & 16 deletions src/core/ddsc/src/dds_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,23 +717,8 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
iox_sub_context_t **context = iox_sub_context_ptr(rd->m_iox_sub);
*context = &rd->m_iox_sub_context;

rc = shm_monitor_attach_reader(&rd->m_entity.m_domain->m_shm_monitor, rd);

if (rc != DDS_RETCODE_OK) {
// we fail if we cannot attach to the listener (as we would get no data)
iox_sub_deinit(rd->m_iox_sub);
rd->m_iox_sub = NULL;
DDS_CLOG(DDS_LC_WARNING | DDS_LC_SHM,
&rd->m_entity.m_domain->gv.logconfig,
"Failed to attach iox subscriber to iox listener\n");
// FIXME: We need to clean up everything created up to now.
// Currently there is only partial cleanup, we need to extend it.
goto err_bad_qos;
}

// those are set once and never changed
// they are used to access reader and monitor from the callback when data is received
rd->m_iox_sub_context.monitor = &rd->m_entity.m_domain->m_shm_monitor;
// they are used to access reader from the callback when data is received
rd->m_iox_sub_context.parent_reader = rd;
}
#endif
Expand Down Expand Up @@ -838,6 +823,13 @@ uint32_t dds_reader_lock_samples (dds_entity_t reader)
uint32_t n;
if (dds_reader_lock (reader, &rd) != DDS_RETCODE_OK)
return 0;
#ifdef DDS_HAS_SHM
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this is the right place (logically) to pull the samples from iceoryx to the rhc here.

This is purely by name of the function, which indicates that somehow the rhc is locked but what else happens? Maybe it is more like an update of the rhc, but such an update must happen protected by a lock (i.e. this is the right place but the name of the function is off).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the entry point for the C++ API, it first calls this function to check the number of samples in the reader history cache, then updates the buffers accordingly, and then calls the actual read/take_cdr API. So in the context of C++ API I guess this should be correct.

WRT C API, I have added this in read_take_impl functions, which I need to check with @eboasson if it makes sense

// If SHM is supported and if the monitor is not attached
if(rd->m_iox_sub && !rd->m_iox_sub_context.monitor) {
// transfer the samples from iox to rhc in the same thread context
dds_transfer_samples_from_iox_to_rhc(rd);
}
#endif
n = dds_rhc_lock_samples (rd->m_rhc);
dds_reader_unlock (rd);
return n;
Expand Down
42 changes: 42 additions & 0 deletions src/core/ddsc/src/dds_waitset.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,34 @@ dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_
if (ret < 0 && dds_entity_kind (e) == DDS_KIND_SUBSCRIBER)
dds_subscriber_adjust_materialize_data_on_readers ((dds_subscriber *) e, false);

#if DDS_HAS_SHM
struct dds_reader * rd = NULL;
// if read condition is attached
if ((dds_entity_kind(e) == DDS_KIND_COND_READ) &&
(dds_entity_kind(e->m_parent) == DDS_KIND_READER)) {
rd = (dds_reader *)e->m_parent;
}
// if status condition of a reader is attached with any status mask
// TODO(Sumanth), or should we only enable this with data available status mask
else if ((dds_entity_supports_validate_status(e)) &&
(dds_entity_kind(e) == DDS_KIND_READER)) {
rd = (dds_reader *)e;
}

// if communication is over SHM
if ((rd) && (rd->m_iox_sub != NULL)) {
// Attach this specific reader to the iox listener, which transfers the data from the iox
// subscriber queue to the reader history cache in a separate background thread, as
// opposed to transferring the data when actually read/take is called
if (DDS_RETCODE_OK != shm_monitor_attach_reader(&rd->m_entity.m_domain->m_shm_monitor, rd)) {
// we fail if we cannot attach to the listener (as we would get no data)
iox_sub_deinit(rd->m_iox_sub);
rd->m_iox_sub = NULL;
ret = DDS_RETCODE_ERROR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we may now fail if the iceoryx listener fails to attach (due to resource limits). But previously we would certainly have failed at reader creation (and since not every reader will use a waitset we will now not necessarily fail).

}
}
#endif // DDS_HAS_SHM

err_scope:
dds_entity_unpin (e);
err_entity:
Expand Down Expand Up @@ -364,6 +392,20 @@ dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity)
; /* entity invalid */
else
{
#if DDS_HAS_SHM
if ((dds_entity_kind(e) == DDS_KIND_COND_READ) &&
(dds_entity_kind(e->m_parent) == DDS_KIND_READER)) {
struct dds_reader * rd = (dds_reader *) e->m_parent;
if (rd->m_iox_sub != NULL) {
// If the currently detached entity is a read condition and if there are no valid
// statuses for this reader, then detach the iox listener from this specific reader
if (!dds_entity_supports_validate_status(e)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand the status logic, but it may be needed. (I cannot say).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically checking if there are any valid status makes. The logic is when the entity to be detached is a read_condition and if there are no valid statuses for this reader, then we detach the iox_listener from this specific reader

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see and it sounds reasonable. But I do not know whether there is more to it.

shm_monitor_detach_reader(&rd->m_entity.m_domain->m_shm_monitor, rd);
}
}
}
// TODO(Sumanth), detaching based on the status mask seems to be not trivial, check this
#endif
ret = dds_entity_observer_unregister (e, ws, true);

// This waitset no longer requires a subscriber to have a materialized DATA_ON_READERS
Expand Down
4 changes: 0 additions & 4 deletions src/core/ddsc/src/shm__monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,9 @@ enum shm_monitor_states {
/// @brief abstraction for monitoring the shared memory communication with an internal
/// thread responsible for reacting on received data via shared memory
struct shm_monitor {
ddsrt_mutex_t m_lock;
iox_listener_t m_listener;

//use this if we wait but want to wake up for some reason e.g. terminate
iox_user_trigger_t m_wakeup_trigger;

uint32_t m_number_of_attached_readers;
uint32_t m_state;
};

Expand Down
Loading