Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/awslabs/aws-crt-cpp into mq…
Browse files Browse the repository at this point in the history
…tt5_ga_review
  • Loading branch information
xiazhvera committed Dec 4, 2023
2 parents 8208305 + 6f2cede commit 40b5f8b
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 14 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.24.8
0.24.9
115 changes: 103 additions & 12 deletions bin/mqtt5_canary/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,23 @@ static void s_AwsMqtt5CanaryInitTesterOptions(struct AwsMqtt5CanaryTesterOptions
testerOptions->memoryCheckIntervalSec = 600;
}

struct AwsMqtt5CanaryStatistic
{
uint64_t totalOperations;

uint64_t subscribe_attempt;
uint64_t subscribe_succeed;
uint64_t subscribe_failed;

uint64_t publish_attempt;
uint64_t publish_succeed;
uint64_t publish_failed;

uint64_t unsub_attempt;
uint64_t unsub_succeed;
uint64_t unsub_failed;
} g_statistic;

struct AwsMqtt5CanaryTestClient
{
std::shared_ptr<Mqtt5::Mqtt5Client> client;
Expand Down Expand Up @@ -382,6 +399,8 @@ static int s_AwsMqtt5CanaryOperationStop(struct AwsMqtt5CanaryTestClient *testCl
{
return AWS_OP_SUCCESS;
}

g_statistic.totalOperations++;
if (testClient->client->Stop())
{
testClient->subscriptionCount = 0;
Expand Down Expand Up @@ -422,12 +441,27 @@ static int s_AwsMqtt5CanaryOperationSubscribe(struct AwsMqtt5CanaryTestClient *t

testClient->subscriptionCount++;

++g_statistic.totalOperations;
++g_statistic.subscribe_attempt;
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Subscribe to topic: %s", testClient->clientId.c_str(), topicArray);

if (testClient->client->Subscribe(packet))
if (testClient->client->Subscribe(packet, [](int errorcode, std::shared_ptr<SubAckPacket>) {
if (errorcode != 0)
{
++g_statistic.subscribe_failed;
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CANARY,
"Subscribe failed with errorcode: %d, %s\n",
errorcode,
aws_error_str(errorcode));
return;
}
++g_statistic.subscribe_succeed;
}))
{
return AWS_OP_SUCCESS;
}
++g_statistic.subscribe_failed;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Subscribe Failed", testClient->clientId.c_str());
return AWS_OP_ERR;
}
Expand All @@ -448,15 +482,18 @@ static int s_AwsMqtt5CanaryOperationUnsubscribeBad(struct AwsMqtt5CanaryTestClie
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscription = std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscription->WithTopicFilters(topics);

++g_statistic.totalOperations;
++g_statistic.unsub_attempt;
if (testClient->client->Unsubscribe(
unsubscription, [testClient](int, std::shared_ptr<Mqtt5::UnSubAckPacket> packet) {
if (packet == nullptr)
return;
if (packet->getReasonCodes()[0] == AWS_MQTT5_UARC_SUCCESS)
{
++g_statistic.unsub_succeed;
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CANARY,
"ID:%s Unsubscribe Bad Server Failed with errorcode : %s",
"ID:%s Unsubscribe Bad Server Failed with errorcode : %s\n",
testClient->clientId.c_str(),
packet->getReasonString()->c_str());
}
Expand All @@ -465,6 +502,7 @@ static int s_AwsMqtt5CanaryOperationUnsubscribeBad(struct AwsMqtt5CanaryTestClie
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe Bad", testClient->clientId.c_str());
return AWS_OP_SUCCESS;
}
++g_statistic.unsub_failed;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe Bad Operation Failed", testClient->clientId.c_str());
return AWS_OP_ERR;
}
Expand Down Expand Up @@ -492,12 +530,16 @@ static int s_AwsMqtt5CanaryOperationUnsubscribe(struct AwsMqtt5CanaryTestClient
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscription = std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscription->WithTopicFilters(topics);

++g_statistic.totalOperations;
++g_statistic.unsub_attempt;
if (testClient->client->Unsubscribe(unsubscription))
{
++g_statistic.unsub_succeed;
AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe from topic: %s", testClient->clientId.c_str(), topicArray);
return AWS_OP_SUCCESS;
}
++g_statistic.unsub_failed;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe Failed", testClient->clientId.c_str());
return AWS_OP_ERR;
}
Expand Down Expand Up @@ -541,13 +583,30 @@ static int s_AwsMqtt5CanaryOperationPublish(
.WithUserProperty(std::move(up2))
.WithUserProperty(std::move(up3));

if (testClient->client->Publish(packetPublish))
++g_statistic.totalOperations;
++g_statistic.publish_attempt;

if (testClient->client->Publish(packetPublish, [testClient](int errorcode, std::shared_ptr<PublishResult> packet) {
if (errorcode != 0)
{
++g_statistic.publish_failed;
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CANARY,
"ID: %s Publish failed with error code: %d, %s\n",
testClient->clientId.c_str(),
errorcode,
aws_error_str(errorcode));
return;
}
++g_statistic.publish_succeed;
}))
{
AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY, "ID:%s Publish to topic %s", testClient->clientId.c_str(), topicFilter.c_str());
return AWS_OP_SUCCESS;
}
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Publish Failed", testClient->clientId.c_str());
++g_statistic.publish_failed;
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Publish Failed\n", testClient->clientId.c_str());
return AWS_OP_ERR;
}

Expand Down Expand Up @@ -691,6 +750,8 @@ int main(int argc, char **argv)
AWS_ZERO_STRUCT(operations);
testerOptions.operations = operations;

AWS_ZERO_STRUCT(g_statistic);

s_ParseOptions(argc, argv, appCtx, &testerOptions);
if (appCtx.uri.GetPort())
{
Expand Down Expand Up @@ -830,7 +891,8 @@ int main(int argc, char **argv)
.WithSocketOptions(socketOptions)
.WithBootstrap(&clientBootstrap)
.WithPingTimeoutMs(10000)
.WithReconnectOptions({AWS_EXPONENTIAL_BACKOFF_JITTER_NONE, 1000, 120000, 3000});
.WithReconnectOptions({AWS_EXPONENTIAL_BACKOFF_JITTER_NONE, 1000, 120000, 3000})
.WithConnackTimeoutMs(3000);

if (appCtx.use_tls)
{
Expand Down Expand Up @@ -893,11 +955,12 @@ int main(int argc, char **argv)

mqtt5Options.WithClientDisconnectionCallback([&clients, i](const OnDisconnectionEventData &) {
clients[i].isConnected = false;
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Disconnect", clients[i].clientId.c_str());
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Disconnect", clients[i].clientId.c_str());
});

mqtt5Options.WithClientStoppedCallback([&clients, i](const OnStoppedEventData &) {
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Stopped", clients[i].clientId.c_str());
clients[i].isConnected = false;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Stopped", clients[i].clientId.c_str());
});

clients[i].client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, appCtx.allocator);
Expand Down Expand Up @@ -948,17 +1011,19 @@ int main(int argc, char **argv)

if (now > timeTestFinish)
{
printf(
" Operating TPS average over test: %zu\n\n", operationsExecuted / testerOptions.testRunSeconds);
fprintf(
stderr,
" Operating TPS average over test: %zu\n\n",
operationsExecuted / testerOptions.testRunSeconds);
done = true;
}

if (now > memoryCheckPoint)
{
const size_t outstanding_bytes = aws_mem_tracer_bytes(allocator);
printf("Summary:\n");
printf(" Outstanding bytes: %zu\n", outstanding_bytes);
printf(" Operations executed: %zu\n", operationsExecuted);
fprintf(stderr, "Summary:\n");
fprintf(stderr, " Outstanding bytes: %zu\n", outstanding_bytes);
fprintf(stderr, " Operations executed: %zu\n", operationsExecuted);
memoryCheckPoint = now + timeInterval;
}

Expand All @@ -977,6 +1042,32 @@ int main(int argc, char **argv)
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s STOP Operation Failed.", client.clientId.c_str());
}
}

fprintf(
stderr,
"Final Statistic: \n"
"total operations: %" PRId64 "\n"
"tps: %" PRId64 "\n"
"subscribe attempt: %" PRId64 "\n"
"subscribe succeed: %" PRId64 "\n"
"subscribe failed: %" PRId64 "\n"
"publish attempt: %" PRId64 "\n"
"publish succeed: %" PRId64 "\n"
"publish failed: %" PRId64 "\n"
"unsub attempt: %" PRId64 "\n"
"unsub succeed: %" PRId64 "\n"
"unsub failed: %" PRId64 "\n",
g_statistic.totalOperations,
g_statistic.totalOperations / testerOptions.testRunSeconds,
g_statistic.subscribe_attempt,
g_statistic.subscribe_succeed,
g_statistic.subscribe_failed,
g_statistic.publish_attempt,
g_statistic.publish_succeed,
g_statistic.publish_failed,
g_statistic.unsub_attempt,
g_statistic.unsub_succeed,
g_statistic.unsub_failed);
}

aws_mem_tracer_destroy(allocator);
Expand Down

0 comments on commit 40b5f8b

Please sign in to comment.