Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize read/take path with SHM when waitset is not used for the data notification #1173

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

sumanth-nirmal
Copy link
Contributor

@sumanth-nirmal sumanth-nirmal commented Mar 2, 2022

This MR changes the way the iceoryx listener is attached to the reader to avoid additional thread context to update the data from the iox_queue to the reader history cache when publish and subscribe happens locally without the waitset.

With the changes, the data updates happen differently as in the following scenarios

Scenario-1
(SHM enabled)
A setup where data is published, and data is read/taken without a waitset or any data available notification mechanisms, then the iox_listener is not attached to the reader, which means the data from iox queue is transferred to the reader history cache in the same context (with out any additional thread context).

Scenario-2
(SHM enabled)
A setup where data is published and data is read/taken by waiting on a waitset with any data availability status mask or
Read condition, then the iox_listener is attached to this reader, which means the data transfer from iox queue to the reader history cache happens in an additional background thread.

This needs to be merged after #1175

Copy link
Contributor

@MatthiasKillat MatthiasKillat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumanth-nirmal Looks ok in general, but there are minor changes required. The status handling in the waitset is unclear to me.

src/core/ddsc/src/dds_read.c Outdated Show resolved Hide resolved
enum iox_ChunkReceiveResult take_result = iox_sub_take_chunk(rd->m_iox_sub, (const void **const)&chunk);
shm_unlock_iox_sub(rd->m_iox_sub);

if (ChunkReceiveResult_SUCCESS != take_result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we may need to distinguish between various different errors like

enum iox_ChunkReceiveResult
{
    ChunkReceiveResult_TOO_MANY_CHUNKS_HELD_IN_PARALLEL,
    ChunkReceiveResult_NO_CHUNK_AVAILABLE,
    ChunkReceiveResult_UNDEFINED_ERROR,
    ChunkReceiveResult_SUCCESS,
};

to e.g. warn a user that we hold to many chunks in parallel (meaning the consumer working with the chunks is too slow and we hence may lose chunks).

I can do this in an upcoming PR which establishes compatibility with iceoryx 2.0 (cf. #1174)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have abstracted the code from shm_monitor to a separate API for this functionality, so the error handling should be good.

// Get writer or proxy writer
struct entity_common *e = entidx_lookup_guid_untyped(gv->entity_index, &ice_hdr->guid);
if (e == NULL || (e->kind != EK_PROXY_WRITER && e->kind != EK_WRITER)) {
// Ignore that doesn't match a known writer or proxy writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing part in comment: that what does not match? Comment is unclear to me. I think we check whether we know the writer/sender of the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

src/core/ddsc/src/dds__reader.h Outdated Show resolved Hide resolved
@@ -800,6 +794,13 @@ uint32_t dds_reader_lock_samples (dds_entity_t reader)
uint32_t n;
if (dds_reader_lock (reader, &rd) != DDS_RETCODE_OK)
return 0;
#ifdef DDS_HAS_SHM
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this is the right place (logically) to pull the samples from iceoryx to the rhc here.

This is purely by name of the function, which indicates that somehow the rhc is locked but what else happens? Maybe it is more like an update of the rhc, but such an update must happen protected by a lock (i.e. this is the right place but the name of the function is off).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the entry point for the C++ API, it first calls this function to check the number of samples in the reader history cache, then updates the buffers accordingly, and then calls the actual read/take_cdr API. So in the context of C++ API I guess this should be correct.

WRT C API, I have added this in read_take_impl functions, which I need to check with @eboasson if it makes sense

return DDS_RETCODE_OUT_OF_RESOURCES;
}

// TODO(Sumanth), do we even use this at all?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the iox listener does sufficient bookkeeping of this on its own, so unless we want a lower maximum of attachments than the listener allows or want to query the number of attachments we will not need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this.

// TODO(Sumanth), do we even use this at all?
++monitor->m_number_of_attached_readers;
reader->m_iox_sub_stor.monitor = &reader->m_entity.m_domain->m_shm_monitor;
reader->m_iox_sub_stor.parent_reader = reader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race: must be set before attaching, since as soon as it is attached the callback may try to access t.

This means if attaching fails it could be reset, but technically this could also be set once on init of the reader after the iceoryx subscriber is created as it does not change.

Copy link
Contributor Author

@sumanth-nirmal sumanth-nirmal Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am setting now this in the init

}
++monitor->m_number_of_attached_readers;

ddsrt_mutex_unlock(&monitor->m_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not needed (see locking above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// are we really tracking the number of attached readers?
--monitor->m_number_of_attached_readers;
reader->m_iox_sub_stor.monitor = NULL;
reader->m_iox_sub_stor.parent_reader = NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed on every attach/detach cyclone, setting it once to non-null is enough.

iox_listener_detach_subscriber_event(monitor->m_listener, reader->m_iox_sub, SubscriberEvent_DATA_RECEIVED);
// are we really tracking the number of attached readers?
--monitor->m_number_of_attached_readers;
reader->m_iox_sub_stor.monitor = NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably better to unset this (i.e. set to NULL) after detaching, but I think it is technically not required.
Previously it was done in a way so that there should be no races in the callback the listener executes. I think after the proposed changes this should not cause races as well.

@sumanth-nirmal sumanth-nirmal force-pushed the decouple-listener-thread-for-take-optimization branch 7 times, most recently from 945ab91 to 05eae64 Compare March 8, 2022 02:38
@sumanth-nirmal sumanth-nirmal marked this pull request as ready for review March 8, 2022 07:32
@sumanth-nirmal sumanth-nirmal force-pushed the decouple-listener-thread-for-take-optimization branch 2 times, most recently from 7249200 to fa19754 Compare March 11, 2022 07:53
@MatthiasKillat
Copy link
Contributor

@sumanth-nirmal Had a brief look again, will review today once #1175 is merged.


while (true)
{
shm_lock_iox_sub(rd->m_iox_sub);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may also worth looking into whether it pays of to lock for the whole while loop (longer critical section but less lock/unlock).

// we fail if we cannot attach to the listener (as we would get no data)
iox_sub_deinit(rd->m_iox_sub);
rd->m_iox_sub = NULL;
ret = DDS_RETCODE_ERROR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we may now fail if the iceoryx listener fails to attach (due to resource limits). But previously we would certainly have failed at reader creation (and since not every reader will use a waitset we will now not necessarily fail).

struct dds_reader * rd = (dds_reader *) e->m_parent;
if (rd->m_iox_sub != NULL) {
// if there are no valid statuses
if (!dds_entity_supports_validate_status(e)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see and it sounds reasonable. But I do not know whether there is more to it.

// if the reader is attached
if (reader->m_iox_sub_context.monitor != NULL && reader->m_iox_sub_context.parent_reader != NULL) {
iox_listener_detach_subscriber_event(monitor->m_listener, reader->m_iox_sub, SubscriberEvent_DATA_RECEIVED);
reader->m_iox_sub_context.monitor = NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.
There should be no race in the callback due to setting these NULL, as it should run to completion before detaching. This also means a blocking callback will block detach, but this was also the case before (and the callback is not blocking).

if(storage->monitor->m_state == SHM_MONITOR_RUNNING && storage->call) {
storage->call(storage->arg);
}
dds_transfer_samples_from_iox_to_rhc(rd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can now call this directly in shm_subscriber_callback, i.e. there is no need for receive_data_wakeup_handler anymore.

receive_data_wakeup_handler(storage->parent_reader);
iox_sub_context_t *context = (iox_sub_context_t*) context_data;
if((context->monitor) && (context->monitor->m_state == SHM_MONITOR_RUNNING)) {
receive_data_wakeup_handler(context->parent_reader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call dds_transfer_samples_from_iox_to_rhc(context->parent_reader) here.

@MatthiasKillat
Copy link
Contributor

MatthiasKillat commented Mar 11, 2022

Pulling the data from iceoryx in a read is beneficial to avoid context switches (icoeryx listener thread). As long as the buffer of iceoryx is at least as large as the reader history cache, there is no data loss that we would not experience otherwise.

If the user is actively waiting for data the listener works as before. I wonder if this could be optimized for the DDS waitset to also only pull data in the read call after being woken up by the waitset? Even for a listener with background thread this would work, as long as it calls the regular read (which pulls data from iceoryx as needed).

I was also thinking of whether we could have some kind of hybrid approach:

  1. attach a listener, observe data events but more specific ones like: there are N samples in the iceoryx buffer (configurable, say N is half the cache size).
  2. only wake up if we have N samples an transfer them.
  3. transfer in any read as well, regardless of number of samples in iceoryx buffer

Unfortunately iceoryx lacks the trigger event for this (waking up on any sample and checking the number defeats the purpose of having less wake-ups). Therefore it is not possible but we may change iceoryx for this in the future.

@sumanth-nirmal sumanth-nirmal force-pushed the decouple-listener-thread-for-take-optimization branch 3 times, most recently from 47547b3 to c570ab3 Compare March 17, 2022 22:00
  - Make them thread safe
  - The attach API will attach only if the reader is not already attached
  - The attach API also updates the iox_sub storage with monitor and the parent reader
  - The detach API will only attempt to detach if there is a monitor attched

Signed-off-by: Sumanth Nirmal <[email protected]>
…ory cache from shm_monitor.c to dds_read.c

Signed-off-by: Sumanth Nirmal <[email protected]>
…scriber

queue to the reader cache directly in the same thread context, if the monitor
is not attached to this reader

Signed-off-by: Sumanth Nirmal <[email protected]>
 The reader will be attached to the monitor when the reader has read conditions or
status conditions or listener with data availability status mask

Signed-off-by: Sumanth Nirmal <[email protected]>
read/take_impl APIs when SHM is supported and iox_listener is not
attached to a specific reader

Signed-off-by: Sumanth Nirmal <[email protected]>
Signed-off-by: Sumanth Nirmal <[email protected]>
@sumanth-nirmal sumanth-nirmal force-pushed the decouple-listener-thread-for-take-optimization branch from c570ab3 to bbbb793 Compare March 24, 2022 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants