Skip to content

Commit

Permalink
fix data race with canary clients
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Dec 11, 2023
1 parent dd818f6 commit a28d82f
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions bin/mqtt5_canary/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ struct AwsMqtt5CanaryStatistic

struct AwsMqtt5CanaryTestClient
{
struct aws_mutex client_lock;
std::shared_ptr<Mqtt5::Mqtt5Client> client;
std::shared_ptr<Mqtt5::NegotiatedSettings> settings;
Aws::Crt::String sharedTopic;
Expand Down Expand Up @@ -497,6 +498,10 @@ static int s_AwsMqtt5CanaryOperationUnsubscribeBad(struct AwsMqtt5CanaryTestClie
testClient->clientId.c_str(),
packet->getReasonString()->c_str());
}
else
{
++g_statistic.unsub_failed;
}
}))
{
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe Bad", testClient->clientId.c_str());
Expand Down Expand Up @@ -916,51 +921,62 @@ int main(int argc, char **argv)
{
struct AwsMqtt5CanaryTestClient client;
client = {};
aws_mutex_init(&client.client_lock);
Aws::Crt::UUID uuid;
client.clientId = String("TestClient") + std::to_string(i).c_str() + "_" + uuid.ToString();
client.sharedTopic = Aws::Crt::String(sharedTopicArray);
client.isConnected = false;
clients.push_back(client);
mqtt5Options.WithAckTimeoutSeconds(10);
mqtt5Options.WithPublishReceivedCallback([&clients, i](const Mqtt5::PublishReceivedEventData &publishData) {
AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY,
"Client:%s Publish Received on topic %s",
clients[i].clientId.c_str(),
publishData.publishPacket->getTopic().c_str());
if (aws_mutex_try_lock(&clients[i].client_lock) == AWS_OP_SUCCESS)
{
AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY,
"Client:%s Publish Received on topic %s",
clients[i].clientId.c_str(),
publishData.publishPacket->getTopic().c_str());
aws_mutex_unlock(&clients[i].client_lock);
}
});

mqtt5Options.WithClientConnectionSuccessCallback(
[&clients, i](const Mqtt5::OnConnectionSuccessEventData &eventData) {
aws_mutex_lock(&clients[i].client_lock);
clients[i].isConnected = true;
clients[i].clientId = Aws::Crt::String(
eventData.negotiatedSettings->getClientId().c_str(),
eventData.negotiatedSettings->getClientId().length());
clients[i].clientId = eventData.negotiatedSettings->getClientId();
clients[i].settings = eventData.negotiatedSettings;

AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Connection Success", clients[i].clientId.c_str());
aws_mutex_unlock(&clients[i].client_lock);
});

mqtt5Options.WithClientConnectionFailureCallback(
[&clients, i](const OnConnectionFailureEventData &eventData) {
aws_mutex_lock(&clients[i].client_lock);
clients[i].isConnected = false;
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CANARY,
"ID:%s Connection failed with Error Code: %d(%s)",
clients[i].clientId.c_str(),
eventData.errorCode,
aws_error_debug_str(eventData.errorCode));
aws_mutex_unlock(&clients[i].client_lock);
});

mqtt5Options.WithClientDisconnectionCallback([&clients, i](const OnDisconnectionEventData &) {
aws_mutex_lock(&clients[i].client_lock);
clients[i].isConnected = false;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Disconnect", clients[i].clientId.c_str());
aws_mutex_unlock(&clients[i].client_lock);
});

mqtt5Options.WithClientStoppedCallback([&clients, i](const OnStoppedEventData &) {
aws_mutex_lock(&clients[i].client_lock);
clients[i].isConnected = false;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Stopped", clients[i].clientId.c_str());
aws_mutex_unlock(&clients[i].client_lock);
});

clients[i].client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, appCtx.allocator);
Expand Down Expand Up @@ -1007,7 +1023,13 @@ int main(int argc, char **argv)
awsMqtt5CanaryOperationFn *operation_fn =
s_AwsMqtt5CanaryOperationTable.operationByOperationType[nextOperation];

(*operation_fn)(&clients[rand() % clients.size()], appCtx.allocator);
size_t random_client_id = rand() % clients.size();
if(aws_mutex_try_lock(&clients[random_client_id].client_lock) == AWS_OP_SUCCESS)
{

(*operation_fn)(&clients[random_client_id], appCtx.allocator);
aws_mutex_unlock(&clients[random_client_id].client_lock);
}

if (now > timeTestFinish)
{
Expand Down

0 comments on commit a28d82f

Please sign in to comment.