diff --git a/bin/mqtt5_canary/main.cpp b/bin/mqtt5_canary/main.cpp index 6df78de77..5b23db4f6 100644 --- a/bin/mqtt5_canary/main.cpp +++ b/bin/mqtt5_canary/main.cpp @@ -278,6 +278,7 @@ struct AwsMqtt5CanaryStatistic struct AwsMqtt5CanaryTestClient { + struct aws_mutex client_lock; std::shared_ptr client; std::shared_ptr settings; Aws::Crt::String sharedTopic; @@ -497,6 +498,10 @@ static int s_AwsMqtt5CanaryOperationUnsubscribeBad(struct AwsMqtt5CanaryTestClie testClient->clientId.c_str(), packet->getReasonString()->c_str()); } + else + { + ++g_statistic.unsub_failed; + } })) { AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe Bad", testClient->clientId.c_str()); @@ -916,6 +921,7 @@ int main(int argc, char **argv) { struct AwsMqtt5CanaryTestClient client; client = {}; + aws_mutex_init(&client.client_lock); Aws::Crt::UUID uuid; client.clientId = String("TestClient") + std::to_string(i).c_str() + "_" + uuid.ToString(); client.sharedTopic = Aws::Crt::String(sharedTopicArray); @@ -923,27 +929,32 @@ int main(int argc, char **argv) clients.push_back(client); mqtt5Options.WithAckTimeoutSeconds(10); mqtt5Options.WithPublishReceivedCallback([&clients, i](const Mqtt5::PublishReceivedEventData &publishData) { - AWS_LOGF_INFO( - AWS_LS_MQTT5_CANARY, - "Client:%s Publish Received on topic %s", - clients[i].clientId.c_str(), - publishData.publishPacket->getTopic().c_str()); + if (aws_mutex_try_lock(&clients[i].client_lock) == AWS_OP_SUCCESS) + { + AWS_LOGF_INFO( + AWS_LS_MQTT5_CANARY, + "Client:%s Publish Received on topic %s", + clients[i].clientId.c_str(), + publishData.publishPacket->getTopic().c_str()); + aws_mutex_unlock(&clients[i].client_lock); + } }); mqtt5Options.WithClientConnectionSuccessCallback( [&clients, i](const Mqtt5::OnConnectionSuccessEventData &eventData) { + aws_mutex_lock(&clients[i].client_lock); clients[i].isConnected = true; - clients[i].clientId = Aws::Crt::String( - eventData.negotiatedSettings->getClientId().c_str(), - eventData.negotiatedSettings->getClientId().length()); + clients[i].clientId = eventData.negotiatedSettings->getClientId(); clients[i].settings = eventData.negotiatedSettings; AWS_LOGF_INFO( AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Connection Success", clients[i].clientId.c_str()); + aws_mutex_unlock(&clients[i].client_lock); }); mqtt5Options.WithClientConnectionFailureCallback( [&clients, i](const OnConnectionFailureEventData &eventData) { + aws_mutex_lock(&clients[i].client_lock); clients[i].isConnected = false; AWS_LOGF_ERROR( AWS_LS_MQTT5_CANARY, @@ -951,16 +962,21 @@ int main(int argc, char **argv) clients[i].clientId.c_str(), eventData.errorCode, aws_error_debug_str(eventData.errorCode)); + aws_mutex_unlock(&clients[i].client_lock); }); mqtt5Options.WithClientDisconnectionCallback([&clients, i](const OnDisconnectionEventData &) { + aws_mutex_lock(&clients[i].client_lock); clients[i].isConnected = false; AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Disconnect", clients[i].clientId.c_str()); + aws_mutex_unlock(&clients[i].client_lock); }); mqtt5Options.WithClientStoppedCallback([&clients, i](const OnStoppedEventData &) { + aws_mutex_lock(&clients[i].client_lock); clients[i].isConnected = false; AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Stopped", clients[i].clientId.c_str()); + aws_mutex_unlock(&clients[i].client_lock); }); clients[i].client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, appCtx.allocator); @@ -1007,7 +1023,13 @@ int main(int argc, char **argv) awsMqtt5CanaryOperationFn *operation_fn = s_AwsMqtt5CanaryOperationTable.operationByOperationType[nextOperation]; - (*operation_fn)(&clients[rand() % clients.size()], appCtx.allocator); + size_t random_client_id = rand() % clients.size(); + if(aws_mutex_try_lock(&clients[random_client_id].client_lock) == AWS_OP_SUCCESS) + { + + (*operation_fn)(&clients[random_client_id], appCtx.allocator); + aws_mutex_unlock(&clients[random_client_id].client_lock); + } if (now > timeTestFinish) {