Skip to content

Commit

Permalink
Merge pull request OpenDDS#4772 from iguessthislldo/igtd/addr-cache-leak
Browse files Browse the repository at this point in the history
Fixed Leak in `AddressCache` for RTPS/UDP Transport
  • Loading branch information
jrw972 authored Aug 19, 2024
2 parents 0c09856 + 8b2d38e commit ee428ed
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 42 deletions.
97 changes: 64 additions & 33 deletions dds/DCPS/AddressCache.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
/*
*
*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/

#ifndef OPENDDS_DCPS_ADDRESSCACHE_H
#define OPENDDS_DCPS_ADDRESSCACHE_H

#include "dcps_export.h"

#include <ace/config-lite.h>
#ifndef ACE_LACKS_PRAGMA_ONCE
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
#endif

#include "Definitions.h"
#include "GuidUtils.h"
Expand All @@ -21,8 +18,6 @@
#include "RcObject.h"
#include "TimeTypes.h"

#include "ace/INET_Addr.h"

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL

namespace OpenDDS {
Expand All @@ -32,15 +27,18 @@ typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) GuidSet;

struct AddressCacheEntry : public virtual RcObject {

AddressCacheEntry() : addrs_(), expires_(MonotonicTimePoint::max_value)
AddressCacheEntry()
: expires_(MonotonicTimePoint::max_value)
#if defined ACE_HAS_CPP11
, addrs_hash_(0)
#endif
{}

AddressCacheEntry(const NetworkAddressSet& addrs, const MonotonicTimePoint& expires) : addrs_(addrs), expires_(expires)
AddressCacheEntry(const NetworkAddressSet& addrs, const MonotonicTimePoint& expires)
: addrs_(addrs)
, expires_(expires)
#if defined ACE_HAS_CPP11
, addrs_hash_(calculate_hash(addrs_))
, addrs_hash_(calculate_hash(addrs_))
#endif
{}

Expand All @@ -54,15 +52,17 @@ struct AddressCacheEntry : public virtual RcObject {
struct AddressCacheEntryProxy {
AddressCacheEntryProxy(RcHandle<AddressCacheEntry> rch) : entry_(rch) {}

bool operator==(const AddressCacheEntryProxy& rhs) const {
bool operator==(const AddressCacheEntryProxy& rhs) const
{
#if defined ACE_HAS_CPP11
return entry_ && rhs.entry_ && entry_->addrs_hash_ == rhs.entry_->addrs_hash_ && entry_->addrs_ == rhs.entry_->addrs_;
#else
return entry_ && rhs.entry_ && entry_->addrs_ == rhs.entry_->addrs_;
#endif
}

bool operator<(const AddressCacheEntryProxy& rhs) const {
bool operator<(const AddressCacheEntryProxy& rhs) const
{
#if defined ACE_HAS_CPP11
return (rhs.entry_ && (!entry_ || (entry_->addrs_hash_ < rhs.entry_->addrs_hash_ || (entry_->addrs_hash_ == rhs.entry_->addrs_hash_ && entry_->addrs_ < rhs.entry_->addrs_))));
#else
Expand All @@ -84,14 +84,13 @@ template <typename Key>
class AddressCache {
public:

typedef OPENDDS_SET(Key) KeySet;
#if defined ACE_HAS_CPP11
typedef OPENDDS_UNORDERED_MAP_T(Key, RcHandle<AddressCacheEntry>) MapType;
typedef OPENDDS_VECTOR(Key) KeyVec;
typedef OPENDDS_UNORDERED_MAP_T(GUID_t, KeyVec) IdMapType;
typedef OPENDDS_UNORDERED_MAP_T(GUID_t, KeySet) IdMapType;
#else
typedef OPENDDS_MAP_T(Key, RcHandle<AddressCacheEntry>) MapType;
typedef OPENDDS_VECTOR(Key) KeyVec;
typedef OPENDDS_MAP_T(GUID_t, KeyVec) IdMapType;
typedef OPENDDS_MAP_T(GUID_t, KeySet) IdMapType;
#endif

AddressCache() {}
Expand Down Expand Up @@ -120,11 +119,7 @@ class AddressCache {
if (pos == cache.map_.end()) {
rch_ = make_rch<AddressCacheEntry>();
cache.map_[key] = rch_;
GuidSet set;
key.get_contained_guids(set);
for (GuidSet::const_iterator it = set.begin(), limit = set.end(); it != limit; ++it) {
cache.id_map_[*it].push_back(key);
}
cache.insert_ids(key);
is_new_ = true;
} else {
rch_ = pos->second;
Expand All @@ -144,21 +139,24 @@ class AddressCache {
#endif
}

inline AddressCacheEntry& value() {
AddressCacheEntry& value()
{
OPENDDS_ASSERT(rch_);
#if defined ACE_HAS_CPP11
non_const_touch_ = true;
#endif
return *rch_;
}

inline const AddressCacheEntry& value() const {
const AddressCacheEntry& value() const
{
OPENDDS_ASSERT(rch_);
return *rch_;
}

#if defined ACE_HAS_CPP11
inline void recalculate_hash() {
void recalculate_hash()
{
if (non_const_touch_) {
rch_->addrs_hash_ = calculate_hash(rch_->addrs_);
non_const_touch_ = false;
Expand Down Expand Up @@ -204,34 +202,67 @@ class AddressCache {
rch->expires_ = expires;
} else {
rch = make_rch<AddressCacheEntry>(addrs, expires);
GuidSet set;
key.get_contained_guids(set);
for (GuidSet::const_iterator it = set.begin(), limit = set.end(); it != limit; ++it) {
id_map_[*it].push_back(key);
}
insert_ids(key);
}
}

bool remove(const Key& key)
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
return map_.erase(key) != 0;
return remove_i(key);
}

void remove_id(const GUID_t& val)
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
const typename IdMapType::iterator pos = id_map_.find(val);
KeySet keys;
if (pos != id_map_.end()) {
for (typename KeyVec::iterator it = pos->second.begin(), limit = pos->second.end(); it != limit; ++it) {
map_.erase(*it);
}
keys.swap(pos->second);
id_map_.erase(pos);
}

for (typename KeySet::iterator key = keys.begin(), limit = keys.end(); key != limit; ++key) {
remove_i(*key);
}
}

bool empty() const
{
return map_.empty() && id_map_.empty();
}

private:

void insert_ids(const Key& key)
{
GuidSet guids;
key.get_contained_guids(guids);
for (GuidSet::const_iterator it = guids.begin(), limit = guids.end(); it != limit; ++it) {
id_map_[*it].insert(key);
}
}

bool remove_i(const Key& key)
{
const bool found = map_.erase(key) != 0;

// Undo insert_ids(key)
GuidSet guids;
key.get_contained_guids(guids);
for (GuidSet::const_iterator guid = guids.begin(), limit = guids.end(); guid != limit; ++guid) {
const typename IdMapType::iterator other = id_map_.find(*guid);
if (other != id_map_.end()) {
other->second.erase(key);
if (other->second.empty()) {
id_map_.erase(other);
}
}
}

return found;
}

mutable ACE_Thread_Mutex mutex_;
MapType map_;
IdMapType id_map_;
Expand Down
2 changes: 1 addition & 1 deletion dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ RtpsUdpDataLink::release_reservations_i(const GUID_t& remote_id,
}
}

sq_.ignore_remote(remote_id);
sq_.ignore(local_id, remote_id);

for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) {
(*drop_it)->data_dropped(true);
Expand Down
4 changes: 2 additions & 2 deletions docs/devguide/quality_of_service.rst
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ This policy applies to the domain participant, data reader, and data writer enti

- :ref:`Default values <qos-defaults>`

* - :term:`DomainParticipant`, :term:`DataWriter`, :term:`DataReader`
* - :term:`DomainParticipant`, :term:`DataWriter`, and :term:`DataReader`

- ``value``

Expand Down Expand Up @@ -1435,7 +1435,7 @@ The property QoS policy contains sequences of key-value pairs for the :term:`Dom

.. important::

This policy is :ref:`mutable <qos-changing>`, but updates to properties after creating the participant might not an effect.
This policy is :ref:`mutable <qos-changing>`, but updates to properties after creating the participant might not have an effect.
This policy affects association indirectly through security.

IDL:
Expand Down
5 changes: 5 additions & 0 deletions docs/news.d/addr-cache-leak.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.. news-prs: 4772
.. news-start-section: Fixes
- Fixed a memory leak in the address caches used by the RTPS/UDP transport.
.. news-end-section
22 changes: 16 additions & 6 deletions tests/unit-tests/dds/DCPS/AddressCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,37 @@ TEST(dds_DCPS_AddressCache, store_remove_load_fail)
AddressCache<TestKey> test_cache_;
NetworkAddressSet addrs;
addrs.insert(NetworkAddress("127.0.0.1:1234"));
const GUID_t a = {{1, 1, 1}, {{1, 1, 1}, 1}};
const GUID_t b = {{2, 2, 2}, {{2, 2, 2}, 2}};
const TestKey key(a, b);

test_cache_.store(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs);
test_cache_.store(key, addrs);
addrs.clear();

test_cache_.remove(TestKey(GUID_UNKNOWN, GUID_UNKNOWN));
ASSERT_FALSE(test_cache_.empty());
test_cache_.remove(key);
ASSERT_TRUE(test_cache_.empty());

ASSERT_FALSE(test_cache_.load(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs));
ASSERT_FALSE(test_cache_.load(key, addrs));
}

TEST(dds_DCPS_AddressCache, store_remove_id_load_fail)
{
AddressCache<TestKey> test_cache_;
NetworkAddressSet addrs;
addrs.insert(NetworkAddress("127.0.0.1:1234"));
const GUID_t a = {{1, 1, 1}, {{1, 1, 1}, 1}};
const GUID_t b = {{2, 2, 2}, {{2, 2, 2}, 2}};
const TestKey key(a, b);

test_cache_.store(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs);
test_cache_.store(key, addrs);
addrs.clear();

test_cache_.remove_id(GUID_UNKNOWN);
ASSERT_FALSE(test_cache_.empty());
test_cache_.remove_id(a);
ASSERT_TRUE(test_cache_.empty());

ASSERT_FALSE(test_cache_.load(TestKey(GUID_UNKNOWN, GUID_UNKNOWN), addrs));
ASSERT_FALSE(test_cache_.load(key, addrs));
}

TEST(dds_DCPS_AddressCache, scoped_access_load_success)
Expand Down

0 comments on commit ee428ed

Please sign in to comment.