diff --git a/src/core/ddsc/src/dds__reader.h b/src/core/ddsc/src/dds__reader.h index 6828319cc2..2e46ef5e5a 100644 --- a/src/core/ddsc/src/dds__reader.h +++ b/src/core/ddsc/src/dds__reader.h @@ -43,6 +43,11 @@ struct nn_rsample_info; struct nn_rdata; DDS_EXPORT void dds_reader_ddsi2direct (dds_entity_t entity, void (*cb) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg), void *cbarg); +/* + Transfer the samples from the iox subscriber queue to the reader history cache +*/ +void dds_transfer_samples_from_iox_to_rhc (dds_reader * reader); + DEFINE_ENTITY_LOCK_UNLOCK(dds_reader, DDS_KIND_READER) #if defined (__cplusplus) diff --git a/src/core/ddsc/src/dds_read.c b/src/core/ddsc/src/dds_read.c index e9304a69a4..7edfb8222c 100644 --- a/src/core/ddsc/src/dds_read.c +++ b/src/core/ddsc/src/dds_read.c @@ -64,6 +64,14 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, thread_state_awake (ts1, &entity->m_domain->gv); +#ifdef DDS_HAS_SHM + // If SHM is supported and if the monitor is not attached + if(rd->m_iox_sub && !rd->m_iox_sub_context.monitor) { + // transfer the samples from iox to rhc in the same thread context + dds_transfer_samples_from_iox_to_rhc(rd); + } +#endif + /* Allocate samples if not provided (assuming all or none provided) */ if (buf[0] == NULL) { @@ -163,6 +171,13 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio } thread_state_awake (ts1, &entity->m_domain->gv); +#ifdef DDS_HAS_SHM + // If SHM is supported and if the monitor is not attached + if(rd->m_iox_sub && !rd->m_iox_sub_context.monitor) { + // transfer the samples from iox to rhc in the same thread context + dds_transfer_samples_from_iox_to_rhc(rd); + } +#endif /* read/take resets data available status -- must reset before reading because the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */ @@ -554,3 +569,94 @@ dds_return_t dds_return_reader_loan (dds_reader *rd, void **buf, int32_t bufsz) ddsrt_mutex_unlock (&rd->m_entity.m_mutex); return DDS_RETCODE_OK; } + + +void dds_transfer_samples_from_iox_to_rhc (dds_reader * rd) +{ +#if DDS_HAS_SHM + void* chunk = NULL; + struct ddsi_domaingv* gv = rd->m_rd->e.gv; + thread_state_awake(lookup_thread_state(), gv); + + while (true) + { + shm_lock_iox_sub(rd->m_iox_sub); + enum iox_ChunkReceiveResult take_result = iox_sub_take_chunk(rd->m_iox_sub, (const void** const)&chunk); + shm_unlock_iox_sub(rd->m_iox_sub); + + // NB: If we cannot take the chunk (sample) the user may lose data. + // Since the subscriber queue can overflow and will evict the least recent sample. + // This entirely depends on the producer and consumer frequency (and the queue size if they are close). + // The consumer here is essentially the reader history cache. + if (ChunkReceiveResult_SUCCESS != take_result) + { + switch(take_result) + { + case ChunkReceiveResult_TOO_MANY_CHUNKS_HELD_IN_PARALLEL : + { + // we hold to many chunks and cannot get more + DDS_CLOG (DDS_LC_WARNING | DDS_LC_SHM, &rd->m_entity.m_domain->gv.logconfig, + "DDS reader with topic %s : iceoryx subscriber - TOO_MANY_CHUNKS_HELD_IN_PARALLEL -" + "could not take sample\n", rd->m_topic->m_name); + break; + } + case ChunkReceiveResult_NO_CHUNK_AVAILABLE: { + // no more chunks to take, ok + break; + } + default : { + // some unkown error occurred + DDS_CLOG(DDS_LC_WARNING | DDS_LC_SHM, &rd->m_entity.m_domain->gv.logconfig, + "DDS reader with topic %s : iceoryx subscriber - UNKNOWN ERROR -" + "could not take sample\n", rd->m_topic->m_name); + } + } + + break; + } + + const iceoryx_header_t* ice_hdr = iceoryx_header_from_chunk(chunk); + + // Get writer or proxy writer + struct entity_common * e = entidx_lookup_guid_untyped (gv->entity_index, &ice_hdr->guid); + if (e == NULL || (e->kind != EK_PROXY_WRITER && e->kind != EK_WRITER)) + { + // Ignore the sample that is not from a known writer or proxy writer + DDS_CLOG (DDS_LC_SHM, &gv->logconfig, "unknown source entity, ignore.\n"); + continue; + } + + // Create struct ddsi_serdata + struct ddsi_serdata* d = ddsi_serdata_from_iox(rd->m_topic->m_stype, ice_hdr->data_kind, &rd->m_iox_sub, chunk); + d->timestamp.v = ice_hdr->tstamp; + d->statusinfo = ice_hdr->statusinfo; + + // Get struct ddsi_tkmap_instance + struct ddsi_tkmap_instance* tk; + if ((tk = ddsi_tkmap_lookup_instance_ref(gv->m_tkmap, d)) == NULL) + { + DDS_CLOG(DDS_LC_SHM, &gv->logconfig, "ddsi_tkmap_lookup_instance_ref failed.\n"); + goto release; + } + + // Generate writer_info + struct ddsi_writer_info wrinfo; + struct dds_qos *xqos; + if (e->kind == EK_PROXY_WRITER) + xqos = ((struct proxy_writer *) e)->c.xqos; + else + xqos = ((struct writer *) e)->xqos; + ddsi_make_writer_info(&wrinfo, e, xqos, d->statusinfo); + (void)ddsi_rhc_store(rd->m_rd->rhc, &wrinfo, d, tk); + +release: + if (tk) + ddsi_tkmap_instance_unref(gv->m_tkmap, tk); + if (d) + ddsi_serdata_unref(d); + } + thread_state_asleep(lookup_thread_state()); +#else + (void)rd; +#endif +} diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index b72a8ad690..017fc47e64 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -717,23 +717,8 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe iox_sub_context_t **context = iox_sub_context_ptr(rd->m_iox_sub); *context = &rd->m_iox_sub_context; - rc = shm_monitor_attach_reader(&rd->m_entity.m_domain->m_shm_monitor, rd); - - if (rc != DDS_RETCODE_OK) { - // we fail if we cannot attach to the listener (as we would get no data) - iox_sub_deinit(rd->m_iox_sub); - rd->m_iox_sub = NULL; - DDS_CLOG(DDS_LC_WARNING | DDS_LC_SHM, - &rd->m_entity.m_domain->gv.logconfig, - "Failed to attach iox subscriber to iox listener\n"); - // FIXME: We need to clean up everything created up to now. - // Currently there is only partial cleanup, we need to extend it. - goto err_bad_qos; - } - // those are set once and never changed - // they are used to access reader and monitor from the callback when data is received - rd->m_iox_sub_context.monitor = &rd->m_entity.m_domain->m_shm_monitor; + // they are used to access reader from the callback when data is received rd->m_iox_sub_context.parent_reader = rd; } #endif @@ -838,6 +823,13 @@ uint32_t dds_reader_lock_samples (dds_entity_t reader) uint32_t n; if (dds_reader_lock (reader, &rd) != DDS_RETCODE_OK) return 0; +#ifdef DDS_HAS_SHM + // If SHM is supported and if the monitor is not attached + if(rd->m_iox_sub && !rd->m_iox_sub_context.monitor) { + // transfer the samples from iox to rhc in the same thread context + dds_transfer_samples_from_iox_to_rhc(rd); + } +#endif n = dds_rhc_lock_samples (rd->m_rhc); dds_reader_unlock (rd); return n; diff --git a/src/core/ddsc/src/dds_waitset.c b/src/core/ddsc/src/dds_waitset.c index b7cb4b7ba7..8c3ef56a4b 100644 --- a/src/core/ddsc/src/dds_waitset.c +++ b/src/core/ddsc/src/dds_waitset.c @@ -333,6 +333,34 @@ dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_ if (ret < 0 && dds_entity_kind (e) == DDS_KIND_SUBSCRIBER) dds_subscriber_adjust_materialize_data_on_readers ((dds_subscriber *) e, false); +#if DDS_HAS_SHM + struct dds_reader * rd = NULL; + // if read condition is attached + if ((dds_entity_kind(e) == DDS_KIND_COND_READ) && + (dds_entity_kind(e->m_parent) == DDS_KIND_READER)) { + rd = (dds_reader *)e->m_parent; + } + // if status condition of a reader is attached with any status mask + // TODO(Sumanth), or should we only enable this with data available status mask + else if ((dds_entity_supports_validate_status(e)) && + (dds_entity_kind(e) == DDS_KIND_READER)) { + rd = (dds_reader *)e; + } + + // if communication is over SHM + if ((rd) && (rd->m_iox_sub != NULL)) { + // Attach this specific reader to the iox listener, which transfers the data from the iox + // subscriber queue to the reader history cache in a separate background thread, as + // opposed to transferring the data when actually read/take is called + if (DDS_RETCODE_OK != shm_monitor_attach_reader(&rd->m_entity.m_domain->m_shm_monitor, rd)) { + // we fail if we cannot attach to the listener (as we would get no data) + iox_sub_deinit(rd->m_iox_sub); + rd->m_iox_sub = NULL; + ret = DDS_RETCODE_ERROR; + } + } +#endif // DDS_HAS_SHM + err_scope: dds_entity_unpin (e); err_entity: @@ -364,6 +392,20 @@ dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity) ; /* entity invalid */ else { +#if DDS_HAS_SHM + if ((dds_entity_kind(e) == DDS_KIND_COND_READ) && + (dds_entity_kind(e->m_parent) == DDS_KIND_READER)) { + struct dds_reader * rd = (dds_reader *) e->m_parent; + if (rd->m_iox_sub != NULL) { + // If the currently detached entity is a read condition and if there are no valid + // statuses for this reader, then detach the iox listener from this specific reader + if (!dds_entity_supports_validate_status(e)) { + shm_monitor_detach_reader(&rd->m_entity.m_domain->m_shm_monitor, rd); + } + } + } + // TODO(Sumanth), detaching based on the status mask seems to be not trivial, check this +#endif ret = dds_entity_observer_unregister (e, ws, true); // This waitset no longer requires a subscriber to have a materialized DATA_ON_READERS diff --git a/src/core/ddsc/src/shm__monitor.h b/src/core/ddsc/src/shm__monitor.h index 7505e94c18..937151d7fa 100644 --- a/src/core/ddsc/src/shm__monitor.h +++ b/src/core/ddsc/src/shm__monitor.h @@ -39,13 +39,9 @@ enum shm_monitor_states { /// @brief abstraction for monitoring the shared memory communication with an internal /// thread responsible for reacting on received data via shared memory struct shm_monitor { - ddsrt_mutex_t m_lock; iox_listener_t m_listener; - //use this if we wait but want to wake up for some reason e.g. terminate iox_user_trigger_t m_wakeup_trigger; - - uint32_t m_number_of_attached_readers; uint32_t m_state; }; diff --git a/src/core/ddsc/src/shm_monitor.c b/src/core/ddsc/src/shm_monitor.c index 8a3c2e8c87..cd7c109a5f 100644 --- a/src/core/ddsc/src/shm_monitor.c +++ b/src/core/ddsc/src/shm_monitor.c @@ -32,9 +32,7 @@ static void shm_subscriber_callback(iox_sub_t subscriber, void * context_data); void shm_monitor_init(shm_monitor_t* monitor) { - ddsrt_mutex_init(&monitor->m_lock); - - // storage is ignored internally now but we cannot pass a nullptr + // storage is ignored internally now but we cannot pass a nullptr monitor->m_listener = iox_listener_init(&(iox_listener_storage_t){0}); monitor->m_wakeup_trigger = iox_user_trigger_init(&(iox_user_trigger_storage_t){0}); @@ -51,7 +49,6 @@ void shm_monitor_destroy(shm_monitor_t* monitor) iox_listener_deinit(monitor->m_listener); iox_user_trigger_deinit(monitor->m_wakeup_trigger); - ddsrt_mutex_destroy(&monitor->m_lock); } dds_return_t shm_monitor_wake_and_disable(shm_monitor_t* monitor) @@ -70,111 +67,46 @@ dds_return_t shm_monitor_wake_and_enable(shm_monitor_t* monitor) dds_return_t shm_monitor_attach_reader(shm_monitor_t* monitor, struct dds_reader* reader) { - - if(iox_listener_attach_subscriber_event_with_context_data(monitor->m_listener, - reader->m_iox_sub, - SubscriberEvent_DATA_RECEIVED, - shm_subscriber_callback, - &reader->m_iox_sub_context) != ListenerResult_SUCCESS) { - DDS_CLOG(DDS_LC_SHM, &reader->m_rd->e.gv->logconfig, "error attaching reader\n"); + enum iox_ListenerResult attach_result = + iox_listener_attach_subscriber_event_with_context_data(monitor->m_listener, + reader->m_iox_sub, + SubscriberEvent_DATA_RECEIVED, + shm_subscriber_callback, + &reader->m_iox_sub_context); + if(ListenerResult_SUCCESS != attach_result) { + switch (attach_result) { + case ListenerResult_EVENT_ALREADY_ATTACHED:{ + break; + } + case ListenerResult_LISTENER_FULL: + case ListenerResult_EMPTY_EVENT_CALLBACK: + case ListenerResult_EMPTY_INVALIDATION_CALLBACK: + case ListenerResult_UNDEFINED_ERROR: + default: { + DDS_CLOG(DDS_LC_SHM, &reader->m_rd->e.gv->logconfig, "error attaching reader\n"); return DDS_RETCODE_OUT_OF_RESOURCES; + } } - ++monitor->m_number_of_attached_readers; + } - return DDS_RETCODE_OK; + reader->m_iox_sub_context.monitor = &reader->m_entity.m_domain->m_shm_monitor; + return DDS_RETCODE_OK; } dds_return_t shm_monitor_detach_reader(shm_monitor_t* monitor, struct dds_reader* reader) { - iox_listener_detach_subscriber_event(monitor->m_listener, reader->m_iox_sub, SubscriberEvent_DATA_RECEIVED); - --monitor->m_number_of_attached_readers; + // if the reader is attached + if (reader->m_iox_sub_context.monitor != NULL && reader->m_iox_sub_context.parent_reader != NULL) { + iox_listener_detach_subscriber_event(monitor->m_listener, reader->m_iox_sub, SubscriberEvent_DATA_RECEIVED); + reader->m_iox_sub_context.monitor = NULL; + reader->m_iox_sub_context.parent_reader = NULL; + } return DDS_RETCODE_OK; } static void receive_data_wakeup_handler(struct dds_reader* rd) { - void* chunk = NULL; - struct ddsi_domaingv* gv = rd->m_rd->e.gv; - thread_state_awake(lookup_thread_state(), gv); - - while (true) - { - shm_lock_iox_sub(rd->m_iox_sub); - enum iox_ChunkReceiveResult take_result = iox_sub_take_chunk(rd->m_iox_sub, (const void** const)&chunk); - shm_unlock_iox_sub(rd->m_iox_sub); - - // NB: If we cannot take the chunk (sample) the user may lose data. - // Since the subscriber queue can overflow and will evict the least recent sample. - // This entirely depends on the producer and consumer frequency (and the queue size if they are close). - // The consumer here is essentially the reader history cache. - if (ChunkReceiveResult_SUCCESS != take_result) - { - switch(take_result) - { - case ChunkReceiveResult_TOO_MANY_CHUNKS_HELD_IN_PARALLEL : - { - // we hold to many chunks and cannot get more - DDS_CLOG (DDS_LC_WARNING | DDS_LC_SHM, &rd->m_entity.m_domain->gv.logconfig, - "DDS reader with topic %s : iceoryx subscriber - TOO_MANY_CHUNKS_HELD_IN_PARALLEL -" - "could not take sample\n", rd->m_topic->m_name); - break; - } - case ChunkReceiveResult_NO_CHUNK_AVAILABLE: { - // no more chunks to take, ok - break; - } - default : { - // some unkown error occurred - DDS_CLOG(DDS_LC_WARNING | DDS_LC_SHM, &rd->m_entity.m_domain->gv.logconfig, - "DDS reader with topic %s : iceoryx subscriber - UNKNOWN ERROR -" - "could not take sample\n", rd->m_topic->m_name); - } - } - - break; - } - - const iceoryx_header_t* ice_hdr = iceoryx_header_from_chunk(chunk); - - // Get writer or proxy writer - struct entity_common * e = entidx_lookup_guid_untyped (gv->entity_index, &ice_hdr->guid); - if (e == NULL || (e->kind != EK_PROXY_WRITER && e->kind != EK_WRITER)) - { - // Ignore that doesn't match a known writer or proxy writer - DDS_CLOG (DDS_LC_SHM, &gv->logconfig, "unknown source entity, ignore.\n"); - continue; - } - - // Create struct ddsi_serdata - struct ddsi_serdata* d = ddsi_serdata_from_iox(rd->m_topic->m_stype, ice_hdr->data_kind, &rd->m_iox_sub, chunk); - d->timestamp.v = ice_hdr->tstamp; - d->statusinfo = ice_hdr->statusinfo; - - // Get struct ddsi_tkmap_instance - struct ddsi_tkmap_instance* tk; - if ((tk = ddsi_tkmap_lookup_instance_ref(gv->m_tkmap, d)) == NULL) - { - DDS_CLOG(DDS_LC_SHM, &gv->logconfig, "ddsi_tkmap_lookup_instance_ref failed.\n"); - goto release; - } - - // Generate writer_info - struct ddsi_writer_info wrinfo; - struct dds_qos *xqos; - if (e->kind == EK_PROXY_WRITER) - xqos = ((struct proxy_writer *) e)->c.xqos; - else - xqos = ((struct writer *) e)->xqos; - ddsi_make_writer_info(&wrinfo, e, xqos, d->statusinfo); - (void)ddsi_rhc_store(rd->m_rd->rhc, &wrinfo, d, tk); - -release: - if (tk) - ddsi_tkmap_instance_unref(gv->m_tkmap, tk); - if (d) - ddsi_serdata_unref(d); - } - thread_state_asleep(lookup_thread_state()); + dds_transfer_samples_from_iox_to_rhc(rd); } static void shm_subscriber_callback(iox_sub_t subscriber, void * context_data) @@ -182,7 +114,7 @@ static void shm_subscriber_callback(iox_sub_t subscriber, void * context_data) (void)subscriber; // we know it is actually in extended storage since we created it like this iox_sub_context_t *context = (iox_sub_context_t*) context_data; - if(context->monitor->m_state == SHM_MONITOR_RUNNING) { + if((context->monitor) && (context->monitor->m_state == SHM_MONITOR_RUNNING)) { receive_data_wakeup_handler(context->parent_reader); } }