From d3e9d22ca6b73a3f4ad06777b74fce03b9c12578 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 19 Dec 2024 11:26:05 -0800 Subject: [PATCH] Don't assert about shared publish routing distribution --- tests/Mqtt5ClientTest.cpp | 87 ++++++++++++--------------------------- 1 file changed, 26 insertions(+), 61 deletions(-) diff --git a/tests/Mqtt5ClientTest.cpp b/tests/Mqtt5ClientTest.cpp index ad14dfcc0..5d721a7b1 100644 --- a/tests/Mqtt5ClientTest.cpp +++ b/tests/Mqtt5ClientTest.cpp @@ -1974,16 +1974,10 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID; const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID; - const int MESSAGE_NUMBER = 10; - std::atomic client_messages(0); - bool client1_received = false; - bool client2_received = false; - - std::vector receivedMessages(MESSAGE_NUMBER); - for (int i = 0; i < MESSAGE_NUMBER; i++) - { - receivedMessages.push_back(0); - } + static const int MESSAGE_COUNT = 10; + std::mutex receivedMessagesLock; + std::condition_variable receivedMessagesSignal; + std::vector receivedMessages; Aws::Iot::Mqtt5ClientBuilder *subscribe_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( @@ -1993,39 +1987,29 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi allocator); ASSERT_TRUE(subscribe_builder); - std::promise client_received; - - auto get_on_message_callback = [&](bool &received) + auto on_message_callback1 = [&](const PublishReceivedEventData &eventData) { - return [&](const PublishReceivedEventData &eventData) -> int + String topic = eventData.publishPacket->getTopic(); + if (topic == TEST_TOPIC) { - String topic = eventData.publishPacket->getTopic(); - if (topic == TEST_TOPIC) + ByteCursor payload = eventData.publishPacket->getPayload(); + String message_string = String((const char *)payload.ptr, payload.len); + int message_int = atoi(message_string.c_str()); + { - ByteCursor payload = eventData.publishPacket->getPayload(); - String message_string = String((const char *)payload.ptr, payload.len); - int message_int = atoi(message_string.c_str()); - ASSERT_TRUE(message_int < MESSAGE_NUMBER); - ++receivedMessages[message_int]; - received = true; // this line has changed - - bool exchanged = false; - int desired = 11; - int tested = 10; - client_messages++; - exchanged = client_messages.compare_exchange_strong(tested, desired); - if (exchanged == true) + std::lock_guard guard(receivedMessagesLock); + receivedMessages.push_back(message_int); + + if (receivedMessages.size() == MESSAGE_COUNT) { - client_received.set_value(); + receivedMessagesSignal.notify_all(); } } - return 0; - }; + } }; - auto onMessage_client1 = get_on_message_callback(client1_received); - auto onMessage_client2 = get_on_message_callback(client2_received); + auto on_message_callback2 = on_message_callback1; - subscribe_builder->WithPublishReceivedCallback(onMessage_client1); + subscribe_builder->WithPublishReceivedCallback(on_message_callback1); Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( @@ -2034,8 +2018,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi mqtt5TestVars.m_private_key_path_string.c_str(), allocator); ASSERT_TRUE(subscribe_builder2); - - subscribe_builder2->WithPublishReceivedCallback(onMessage_client2); + subscribe_builder2->WithPublishReceivedCallback(on_message_callback2); Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( mqtt5TestVars.m_hostname_string, @@ -2113,7 +2096,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi suback.get_future().wait(); /* Publish message 10 to test topic */ - for (int i = 0; i < MESSAGE_NUMBER; i++) + for (int i = 0; i < MESSAGE_COUNT; i++) { std::string payload = std::to_string(i); std::shared_ptr publish = std::make_shared( @@ -2122,32 +2105,14 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi } /* Wait for all packets to be received on both clients */ - client_received.get_future().wait(); - - /* Unsubscribe from the topic from both clients*/ - Vector unsubList; - unsubList.push_back(TEST_TOPIC); - std::shared_ptr unsubscribe_client1 = - std::make_shared(allocator); - unsubscribe_client1->WithTopicFilters(unsubList); - ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client1)); - - std::shared_ptr unsubscribe_client2 = - std::make_shared(allocator); - unsubscribe_client2->WithTopicFilters(unsubList); - ASSERT_TRUE(mqtt5Client2->Unsubscribe(unsubscribe_client2)); - - /* make sure all messages are received */ - ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */ - - /* makes sure both clients received at least one message */ - ASSERT_TRUE(client1_received); - ASSERT_TRUE(client2_received); + std::unique_lock receivedLock(receivedMessagesLock); + receivedMessagesSignal.wait(receivedLock, [&]() { return receivedMessages.size() == MESSAGE_COUNT; }); + std::sort(receivedMessages.begin(), receivedMessages.end()); /* make sure all messages are received with no duplicates*/ - for (int i = 0; i < MESSAGE_NUMBER; i++) + for (int i = 0; i < MESSAGE_COUNT; i++) { - ASSERT_TRUE(receivedMessages[i] > 0); + ASSERT_INT_EQUALS(i, receivedMessages[i]); } /* Stop all clients */ ASSERT_TRUE(mqtt5Client->Stop());