From 4e532244bec5516ac6ba31c5abe6ba4ad88a939c Mon Sep 17 00:00:00 2001 From: Fred Hornsey Date: Thu, 15 Aug 2024 22:23:20 -0500 Subject: [PATCH 1/2] Fixed Leak in AddressCache for RTPS/UDP Transport AddressCache has a `map_` that maps keys (local-remote GUID pairs in RTPS/UDP) to cache entries and a `id_map_` that maps GUIDs to a vector of related keys. When a key is inserted into the cache the GUIDs get get entries in the `id_map_` that can be used to search `map_`. `id_map_` has two problems: - The vector of keys are not checked if they are being duplicated. These changes make it a set. - `remove_id` removes the `id_map_` keys for the GUID argument (the remote GUID in RTPS/UDP), but not any keys that contain that GUID for other vectors in `id_maps_` (the remote in RTPS/UDP). These changes try to clean up all the revelant GUIDs. Also: - A change to release reservation I meant to add in the other leak PR. If the release reservation is for a specfic local-remote pair, it doesn't seem right to remove all messages for the remote from the send queue. - Small fixes to QoS doc that I had in this local repo for some reason. --- dds/DCPS/AddressCache.h | 75 ++++++++++++------- .../transport/rtps_udp/RtpsUdpDataLink.cpp | 2 +- docs/devguide/quality_of_service.rst | 4 +- 3 files changed, 50 insertions(+), 31 deletions(-) diff --git a/dds/DCPS/AddressCache.h b/dds/DCPS/AddressCache.h index a0f41fb9c2c..ce84b085358 100644 --- a/dds/DCPS/AddressCache.h +++ b/dds/DCPS/AddressCache.h @@ -1,6 +1,4 @@ /* - * - * * Distributed under the OpenDDS License. * See: http://www.opendds.org/license.html */ @@ -8,11 +6,10 @@ #ifndef OPENDDS_DCPS_ADDRESSCACHE_H #define OPENDDS_DCPS_ADDRESSCACHE_H -#include "dcps_export.h" - +#include #ifndef ACE_LACKS_PRAGMA_ONCE # pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ +#endif #include "Definitions.h" #include "GuidUtils.h" @@ -20,8 +17,9 @@ #include "PoolAllocator.h" #include "RcObject.h" #include "TimeTypes.h" +#include "dcps_export.h" -#include "ace/INET_Addr.h" +#include OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL @@ -54,7 +52,8 @@ struct AddressCacheEntry : public virtual RcObject { struct AddressCacheEntryProxy { AddressCacheEntryProxy(RcHandle rch) : entry_(rch) {} - bool operator==(const AddressCacheEntryProxy& rhs) const { + bool operator==(const AddressCacheEntryProxy& rhs) const + { #if defined ACE_HAS_CPP11 return entry_ && rhs.entry_ && entry_->addrs_hash_ == rhs.entry_->addrs_hash_ && entry_->addrs_ == rhs.entry_->addrs_; #else @@ -62,7 +61,8 @@ struct AddressCacheEntryProxy { #endif } - bool operator<(const AddressCacheEntryProxy& rhs) const { + bool operator<(const AddressCacheEntryProxy& rhs) const + { #if defined ACE_HAS_CPP11 return (rhs.entry_ && (!entry_ || (entry_->addrs_hash_ < rhs.entry_->addrs_hash_ || (entry_->addrs_hash_ == rhs.entry_->addrs_hash_ && entry_->addrs_ < rhs.entry_->addrs_)))); #else @@ -84,14 +84,13 @@ template class AddressCache { public: + typedef OPENDDS_SET(Key) KeySet; #if defined ACE_HAS_CPP11 typedef OPENDDS_UNORDERED_MAP_T(Key, RcHandle) MapType; - typedef OPENDDS_VECTOR(Key) KeyVec; - typedef OPENDDS_UNORDERED_MAP_T(GUID_t, KeyVec) IdMapType; + typedef OPENDDS_UNORDERED_MAP_T(GUID_t, KeySet) IdMapType; #else typedef OPENDDS_MAP_T(Key, RcHandle) MapType; - typedef OPENDDS_VECTOR(Key) KeyVec; - typedef OPENDDS_MAP_T(GUID_t, KeyVec) IdMapType; + typedef OPENDDS_MAP_T(GUID_t, KeySet) IdMapType; #endif AddressCache() {} @@ -120,11 +119,7 @@ class AddressCache { if (pos == cache.map_.end()) { rch_ = make_rch(); cache.map_[key] = rch_; - GuidSet set; - key.get_contained_guids(set); - for (GuidSet::const_iterator it = set.begin(), limit = set.end(); it != limit; ++it) { - cache.id_map_[*it].push_back(key); - } + cache.insert_ids(key); is_new_ = true; } else { rch_ = pos->second; @@ -144,7 +139,8 @@ class AddressCache { #endif } - inline AddressCacheEntry& value() { + AddressCacheEntry& value() + { OPENDDS_ASSERT(rch_); #if defined ACE_HAS_CPP11 non_const_touch_ = true; @@ -152,13 +148,15 @@ class AddressCache { return *rch_; } - inline const AddressCacheEntry& value() const { + const AddressCacheEntry& value() const + { OPENDDS_ASSERT(rch_); return *rch_; } #if defined ACE_HAS_CPP11 - inline void recalculate_hash() { + void recalculate_hash() + { if (non_const_touch_) { rch_->addrs_hash_ = calculate_hash(rch_->addrs_); non_const_touch_ = false; @@ -204,11 +202,7 @@ class AddressCache { rch->expires_ = expires; } else { rch = make_rch(addrs, expires); - GuidSet set; - key.get_contained_guids(set); - for (GuidSet::const_iterator it = set.begin(), limit = set.end(); it != limit; ++it) { - id_map_[*it].push_back(key); - } + insert_ids(key); } } @@ -222,16 +216,41 @@ class AddressCache { { ACE_Guard guard(mutex_); const typename IdMapType::iterator pos = id_map_.find(val); + KeySet keys; if (pos != id_map_.end()) { - for (typename KeyVec::iterator it = pos->second.begin(), limit = pos->second.end(); it != limit; ++it) { - map_.erase(*it); - } + keys.swap(pos->second); id_map_.erase(pos); } + + for (typename KeySet::iterator key = keys.begin(), limit = keys.end(); key != limit; ++key) { + map_.erase(*key); + + // Undo insert_ids(key) + GuidSet guids; + key->get_contained_guids(guids); + for (GuidSet::const_iterator guid = guids.begin(), limit = guids.end(); guid != limit; ++guid) { + const typename IdMapType::iterator other = id_map_.find(*guid); + if (other != id_map_.end()) { + other->second.erase(*key); + if (other->second.empty()) { + id_map_.erase(other); + } + } + } + } } private: + void insert_ids(const Key& key) + { + GuidSet guids; + key.get_contained_guids(guids); + for (GuidSet::const_iterator it = guids.begin(), limit = guids.end(); it != limit; ++it) { + id_map_[*it].insert(key); + } + } + mutable ACE_Thread_Mutex mutex_; MapType map_; IdMapType id_map_; diff --git a/dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.cpp b/dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.cpp index d8cc0ea0ed7..14da4bf76e4 100644 --- a/dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.cpp +++ b/dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.cpp @@ -968,7 +968,7 @@ RtpsUdpDataLink::release_reservations_i(const GUID_t& remote_id, } } - sq_.ignore_remote(remote_id); + sq_.ignore(local_id, remote_id); for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) { (*drop_it)->data_dropped(true); diff --git a/docs/devguide/quality_of_service.rst b/docs/devguide/quality_of_service.rst index ee134063e41..222840ba978 100644 --- a/docs/devguide/quality_of_service.rst +++ b/docs/devguide/quality_of_service.rst @@ -746,7 +746,7 @@ This policy applies to the domain participant, data reader, and data writer enti - :ref:`Default values ` - * - :term:`DomainParticipant`, :term:`DataWriter`, :term:`DataReader` + * - :term:`DomainParticipant`, :term:`DataWriter`, and :term:`DataReader` - ``value`` @@ -1435,7 +1435,7 @@ The property QoS policy contains sequences of key-value pairs for the :term:`Dom .. important:: - This policy is :ref:`mutable `, but updates to properties after creating the participant might not an effect. + This policy is :ref:`mutable `, but updates to properties after creating the participant might not have an effect. This policy affects association indirectly through security. IDL: From 8b2d38e302fd35f05f30613b7fa20a19c0bfdbdf Mon Sep 17 00:00:00 2001 From: Fred Hornsey Date: Fri, 16 Aug 2024 17:45:09 -0500 Subject: [PATCH 2/2] News, Unit Testing, Tweaks for Unit Testing --- dds/DCPS/AddressCache.h | 54 +++++++++++++--------- docs/news.d/addr-cache-leak.rst | 5 ++ tests/unit-tests/dds/DCPS/AddressCache.cpp | 22 ++++++--- 3 files changed, 54 insertions(+), 27 deletions(-) create mode 100644 docs/news.d/addr-cache-leak.rst diff --git a/dds/DCPS/AddressCache.h b/dds/DCPS/AddressCache.h index ce84b085358..cc228181914 100644 --- a/dds/DCPS/AddressCache.h +++ b/dds/DCPS/AddressCache.h @@ -17,9 +17,6 @@ #include "PoolAllocator.h" #include "RcObject.h" #include "TimeTypes.h" -#include "dcps_export.h" - -#include OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL @@ -30,15 +27,18 @@ typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) GuidSet; struct AddressCacheEntry : public virtual RcObject { - AddressCacheEntry() : addrs_(), expires_(MonotonicTimePoint::max_value) + AddressCacheEntry() + : expires_(MonotonicTimePoint::max_value) #if defined ACE_HAS_CPP11 , addrs_hash_(0) #endif {} - AddressCacheEntry(const NetworkAddressSet& addrs, const MonotonicTimePoint& expires) : addrs_(addrs), expires_(expires) + AddressCacheEntry(const NetworkAddressSet& addrs, const MonotonicTimePoint& expires) + : addrs_(addrs) + , expires_(expires) #if defined ACE_HAS_CPP11 - , addrs_hash_(calculate_hash(addrs_)) + , addrs_hash_(calculate_hash(addrs_)) #endif {} @@ -209,7 +209,7 @@ class AddressCache { bool remove(const Key& key) { ACE_Guard guard(mutex_); - return map_.erase(key) != 0; + return remove_i(key); } void remove_id(const GUID_t& val) @@ -223,23 +223,15 @@ class AddressCache { } for (typename KeySet::iterator key = keys.begin(), limit = keys.end(); key != limit; ++key) { - map_.erase(*key); - - // Undo insert_ids(key) - GuidSet guids; - key->get_contained_guids(guids); - for (GuidSet::const_iterator guid = guids.begin(), limit = guids.end(); guid != limit; ++guid) { - const typename IdMapType::iterator other = id_map_.find(*guid); - if (other != id_map_.end()) { - other->second.erase(*key); - if (other->second.empty()) { - id_map_.erase(other); - } - } - } + remove_i(*key); } } + bool empty() const + { + return map_.empty() && id_map_.empty(); + } + private: void insert_ids(const Key& key) @@ -251,6 +243,26 @@ class AddressCache { } } + bool remove_i(const Key& key) + { + const bool found = map_.erase(key) != 0; + + // Undo insert_ids(key) + GuidSet guids; + key.get_contained_guids(guids); + for (GuidSet::const_iterator guid = guids.begin(), limit = guids.end(); guid != limit; ++guid) { + const typename IdMapType::iterator other = id_map_.find(*guid); + if (other != id_map_.end()) { + other->second.erase(key); + if (other->second.empty()) { + id_map_.erase(other); + } + } + } + + return found; + } + mutable ACE_Thread_Mutex mutex_; MapType map_; IdMapType id_map_; diff --git a/docs/news.d/addr-cache-leak.rst b/docs/news.d/addr-cache-leak.rst new file mode 100644 index 00000000000..33706d993f1 --- /dev/null +++ b/docs/news.d/addr-cache-leak.rst @@ -0,0 +1,5 @@ +.. news-prs: 4772 + +.. news-start-section: Fixes +- Fixed a memory leak in the address caches used by the RTPS/UDP transport. +.. news-end-section diff --git a/tests/unit-tests/dds/DCPS/AddressCache.cpp b/tests/unit-tests/dds/DCPS/AddressCache.cpp index 081931ca695..70f0a33c32e 100644 --- a/tests/unit-tests/dds/DCPS/AddressCache.cpp +++ b/tests/unit-tests/dds/DCPS/AddressCache.cpp @@ -81,13 +81,18 @@ TEST(dds_DCPS_AddressCache, store_remove_load_fail) AddressCache test_cache_; NetworkAddressSet addrs; addrs.insert(NetworkAddress("127.0.0.1:1234")); + const GUID_t a = {{1, 1, 1}, {{1, 1, 1}, 1}}; + const GUID_t b = {{2, 2, 2}, {{2, 2, 2}, 2}}; + const TestKey key(a, b); - test_cache_.store(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs); + test_cache_.store(key, addrs); addrs.clear(); - test_cache_.remove(TestKey(GUID_UNKNOWN, GUID_UNKNOWN)); + ASSERT_FALSE(test_cache_.empty()); + test_cache_.remove(key); + ASSERT_TRUE(test_cache_.empty()); - ASSERT_FALSE(test_cache_.load(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs)); + ASSERT_FALSE(test_cache_.load(key, addrs)); } TEST(dds_DCPS_AddressCache, store_remove_id_load_fail) @@ -95,13 +100,18 @@ TEST(dds_DCPS_AddressCache, store_remove_id_load_fail) AddressCache test_cache_; NetworkAddressSet addrs; addrs.insert(NetworkAddress("127.0.0.1:1234")); + const GUID_t a = {{1, 1, 1}, {{1, 1, 1}, 1}}; + const GUID_t b = {{2, 2, 2}, {{2, 2, 2}, 2}}; + const TestKey key(a, b); - test_cache_.store(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs); + test_cache_.store(key, addrs); addrs.clear(); - test_cache_.remove_id(GUID_UNKNOWN); + ASSERT_FALSE(test_cache_.empty()); + test_cache_.remove_id(a); + ASSERT_TRUE(test_cache_.empty()); - ASSERT_FALSE(test_cache_.load(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs)); + ASSERT_FALSE(test_cache_.load(key, addrs)); } TEST(dds_DCPS_AddressCache, scoped_access_load_success)