Skip to content

Commit

Permalink
Don't assert about shared publish routing distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Dec 19, 2024
1 parent a4dac53 commit d3e9d22
Showing 1 changed file with 26 additions and 61 deletions.
87 changes: 26 additions & 61 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1974,16 +1974,10 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID;
const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID;

const int MESSAGE_NUMBER = 10;
std::atomic<int> client_messages(0);
bool client1_received = false;
bool client2_received = false;

std::vector<int> receivedMessages(MESSAGE_NUMBER);
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
receivedMessages.push_back(0);
}
static const int MESSAGE_COUNT = 10;
std::mutex receivedMessagesLock;
std::condition_variable receivedMessagesSignal;
std::vector<int> receivedMessages;

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
Expand All @@ -1993,39 +1987,29 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
allocator);
ASSERT_TRUE(subscribe_builder);

std::promise<void> client_received;

auto get_on_message_callback = [&](bool &received)
auto on_message_callback1 = [&](const PublishReceivedEventData &eventData)
{
return [&](const PublishReceivedEventData &eventData) -> int
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
{
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());

{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
++receivedMessages[message_int];
received = true; // this line has changed

bool exchanged = false;
int desired = 11;
int tested = 10;
client_messages++;
exchanged = client_messages.compare_exchange_strong(tested, desired);
if (exchanged == true)
std::lock_guard<std::mutex> guard(receivedMessagesLock);
receivedMessages.push_back(message_int);

if (receivedMessages.size() == MESSAGE_COUNT)
{
client_received.set_value();
receivedMessagesSignal.notify_all();
}
}
return 0;
};
}
};
auto onMessage_client1 = get_on_message_callback(client1_received);
auto onMessage_client2 = get_on_message_callback(client2_received);
auto on_message_callback2 = on_message_callback1;

subscribe_builder->WithPublishReceivedCallback(onMessage_client1);
subscribe_builder->WithPublishReceivedCallback(on_message_callback1);

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
Expand All @@ -2034,8 +2018,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(subscribe_builder2);

subscribe_builder2->WithPublishReceivedCallback(onMessage_client2);
subscribe_builder2->WithPublishReceivedCallback(on_message_callback2);

Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
Expand Down Expand Up @@ -2113,7 +2096,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
suback.get_future().wait();

/* Publish message 10 to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
for (int i = 0; i < MESSAGE_COUNT; i++)
{
std::string payload = std::to_string(i);
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
Expand All @@ -2122,32 +2105,14 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
}

/* Wait for all packets to be received on both clients */
client_received.get_future().wait();

/* Unsubscribe from the topic from both clients*/
Vector<String> unsubList;
unsubList.push_back(TEST_TOPIC);
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client1->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client1));

std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client2 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client2->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client2->Unsubscribe(unsubscribe_client2));

/* make sure all messages are received */
ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */

/* makes sure both clients received at least one message */
ASSERT_TRUE(client1_received);
ASSERT_TRUE(client2_received);
std::unique_lock<std::mutex> receivedLock(receivedMessagesLock);
receivedMessagesSignal.wait(receivedLock, [&]() { return receivedMessages.size() == MESSAGE_COUNT; });

std::sort(receivedMessages.begin(), receivedMessages.end());
/* make sure all messages are received with no duplicates*/
for (int i = 0; i < MESSAGE_NUMBER; i++)
for (int i = 0; i < MESSAGE_COUNT; i++)
{
ASSERT_TRUE(receivedMessages[i] > 0);
ASSERT_INT_EQUALS(i, receivedMessages[i]);
}
/* Stop all clients */
ASSERT_TRUE(mqtt5Client->Stop());
Expand Down

0 comments on commit d3e9d22

Please sign in to comment.