Skip to content

Commit

Permalink
Attempt to make non-compliant broker behavior that is causing the wil…
Browse files Browse the repository at this point in the history
…l test to be flaky
  • Loading branch information
Bret Ambrose committed Dec 19, 2024
1 parent 81fe46e commit b067b7f
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions tests/IotServiceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,20 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
subscriberCv.wait(lock, [&]() { return subscriberSubscribed; });
}

// there appears to be a race condition broker-side in IoT Core such that the interrupter is occasionally
// rejected rather than the existing connection. We try and work around this in two ways:
// (1) Wait a couple seconds to let things settle in terms of eventual consistency.
// (2) Have the interrupter make connection attempts in a loop until a successful interruption occurs.
std::this_thread::sleep_for(std::chrono::seconds(2));

// Disconnect the client by interrupting it with another client with the same ID
// which will cause the will to be sent
auto interruptConnection = mqttClient.NewConnection(envVars.inputHost.c_str(), 8883, socketOptions, tlsContext);
interruptConnection->SetWill(topicStr.c_str(), QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload);
// interruptConnection->SetWill(topicStr.c_str(), QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload);
std::mutex interruptMutex;
std::condition_variable interruptCv;
bool interruptConnected = false;
bool interruptConnectionAttemptComplete = false;
auto interruptOnConnectionCompleted =
[&](MqttConnection &, int errorCode, ReturnCode returnCode, bool sessionPresent)
{
Expand All @@ -614,7 +621,11 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
(void)sessionPresent;
{
std::lock_guard<std::mutex> lock(interruptMutex);
interruptConnected = true;
interruptConnectionAttemptComplete = true;
if (errorCode == AWS_ERROR_SUCCESS && returnCode == AWS_MQTT_CONNECT_ACCEPTED)
{
interruptConnected = true;
}
}
interruptCv.notify_one();
};
Expand All @@ -628,11 +639,17 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
};
interruptConnection->OnConnectionCompleted = interruptOnConnectionCompleted;
interruptConnection->OnDisconnect = interruptOnDisconnect;
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);

bool continueConnecting;
do
{
std::unique_lock<std::mutex> lock(interruptMutex);
interruptCv.wait(lock, [&]() { return interruptConnected; });
}
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);
{
std::unique_lock<std::mutex> lock(interruptMutex);
interruptCv.wait(lock, [&]() { return interruptConnectionAttemptComplete; });
continueConnecting = !interruptConnected;
}
} while (continueConnecting);

// wait for message received callback - meaning the will was sent
{
Expand Down

0 comments on commit b067b7f

Please sign in to comment.