diff --git a/bin/mqtt5_canary/main.cpp b/bin/mqtt5_canary/main.cpp index da45f9dec..382728d56 100644 --- a/bin/mqtt5_canary/main.cpp +++ b/bin/mqtt5_canary/main.cpp @@ -673,304 +673,308 @@ int main(int argc, char **argv) { struct aws_allocator *allocator = aws_mem_tracer_new(aws_default_allocator(), NULL, AWS_MEMTRACE_STACKS, 15); - ApiHandle apiHandle(allocator); - struct AppCtx appCtx = {}; - appCtx.allocator = allocator; - appCtx.connect_timeout = 3000; - aws_mutex_init(&appCtx.lock); - appCtx.port = 1883; - - struct AwsMqtt5CanaryTesterOptions testerOptions; - AWS_ZERO_STRUCT(testerOptions); - s_AwsMqtt5CanaryInitTesterOptions(&testerOptions); - enum AwsMqtt5CanaryOperations operations[AWS_MQTT5_CANARY_OPERATION_ARRAY_SIZE]; - AWS_ZERO_STRUCT(operations); - testerOptions.operations = operations; - - s_ParseOptions(argc, argv, appCtx, &testerOptions); - if (appCtx.uri.GetPort()) { - appCtx.port = appCtx.uri.GetPort(); - } + ApiHandle apiHandle(allocator); + struct AppCtx appCtx = {}; + appCtx.allocator = allocator; + appCtx.connect_timeout = 3000; + aws_mutex_init(&appCtx.lock); + appCtx.port = 1883; + + struct AwsMqtt5CanaryTesterOptions testerOptions; + AWS_ZERO_STRUCT(testerOptions); + s_AwsMqtt5CanaryInitTesterOptions(&testerOptions); + enum AwsMqtt5CanaryOperations operations[AWS_MQTT5_CANARY_OPERATION_ARRAY_SIZE]; + AWS_ZERO_STRUCT(operations); + testerOptions.operations = operations; + + s_ParseOptions(argc, argv, appCtx, &testerOptions); + if (appCtx.uri.GetPort()) + { + appCtx.port = appCtx.uri.GetPort(); + } - s_Mqtt5CanaryUpdateTpsSleepTime(&testerOptions); - s_AwsMqtt5CanaryInitWeightedOperations(&testerOptions); + s_Mqtt5CanaryUpdateTpsSleepTime(&testerOptions); + s_AwsMqtt5CanaryInitWeightedOperations(&testerOptions); - /********************************************************** - * LOGGING - **********************************************************/ + /********************************************************** + * LOGGING + **********************************************************/ - if (appCtx.TraceFile) - { - apiHandle.InitializeLogging(appCtx.LogLevel, appCtx.TraceFile); - } - else - { - apiHandle.InitializeLogging(appCtx.LogLevel, stderr); - } + if (appCtx.TraceFile) + { + apiHandle.InitializeLogging(appCtx.LogLevel, appCtx.TraceFile); + } + else + { + apiHandle.InitializeLogging(appCtx.LogLevel, stderr); + } - /*************************************************** - * TLS - ***************************************************/ - auto hostName = appCtx.uri.GetHostName(); - Io::TlsContextOptions tlsCtxOptions; - Io::TlsContext tlsContext; - Io::TlsConnectionOptions tlsConnectionOptions; - if (appCtx.use_tls) - { - if (appCtx.cert && appCtx.key) + /*************************************************** + * TLS + ***************************************************/ + auto hostName = appCtx.uri.GetHostName(); + Io::TlsContextOptions tlsCtxOptions; + Io::TlsContext tlsContext; + Io::TlsConnectionOptions tlsConnectionOptions; + if (appCtx.use_tls) { - tlsCtxOptions = Io::TlsContextOptions::InitClientWithMtls(appCtx.cert, appCtx.key); - if (!tlsCtxOptions) + if (appCtx.cert && appCtx.key) + { + tlsCtxOptions = Io::TlsContextOptions::InitClientWithMtls(appCtx.cert, appCtx.key); + if (!tlsCtxOptions) + { + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CANARY, + "Failed to load %s and %s with error %s.", + appCtx.cert, + appCtx.key, + aws_error_debug_str(tlsCtxOptions.LastError())); + exit(1); + } + } + else + { + tlsCtxOptions = Io::TlsContextOptions::InitDefaultClient(); + if (!tlsCtxOptions) + { + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CANARY, + "Failed to create a default tlsCtxOptions with error %s", + aws_error_debug_str(tlsCtxOptions.LastError())); + exit(1); + } + } + + tlsContext = Io::TlsContext(tlsCtxOptions, Io::TlsMode::CLIENT, appCtx.allocator); + + tlsConnectionOptions = tlsContext.NewConnectionOptions(); + + if (!tlsConnectionOptions.SetServerName(hostName)) { AWS_LOGF_ERROR( AWS_LS_MQTT5_CANARY, - "Failed to load %s and %s with error %s.", - appCtx.cert, - appCtx.key, - aws_error_debug_str(tlsCtxOptions.LastError())); + "Failed to set servername with error %s", + aws_error_debug_str(tlsConnectionOptions.LastError())); exit(1); } - } - else - { - tlsCtxOptions = Io::TlsContextOptions::InitDefaultClient(); - if (!tlsCtxOptions) + if (!tlsConnectionOptions.SetAlpnList("x-amzn-mqtt-ca")) { AWS_LOGF_ERROR( AWS_LS_MQTT5_CANARY, - "Failed to create a default tlsCtxOptions with error %s", - aws_error_debug_str(tlsCtxOptions.LastError())); + "Failed to set alpn list with error %s", + aws_error_debug_str(tlsConnectionOptions.LastError())); exit(1); } } - tlsContext = Io::TlsContext(tlsCtxOptions, Io::TlsMode::CLIENT, appCtx.allocator); + /********************************************************** + * EVENT LOOP GROUP + **********************************************************/ - tlsConnectionOptions = tlsContext.NewConnectionOptions(); + Io::SocketOptions socketOptions; + socketOptions.SetConnectTimeoutMs(appCtx.connect_timeout); + socketOptions.SetKeepAliveIntervalSec(10000); - if (!tlsConnectionOptions.SetServerName(hostName)) + Io::EventLoopGroup eventLoopGroup(testerOptions.elgMaxThreads, appCtx.allocator); + if (!eventLoopGroup) { AWS_LOGF_ERROR( AWS_LS_MQTT5_CANARY, - "Failed to set servername with error %s", - aws_error_debug_str(tlsConnectionOptions.LastError())); + "Failed to create eventloop group with error %s", + aws_error_debug_str(eventLoopGroup.LastError())); exit(1); } - if (!tlsConnectionOptions.SetAlpnList("x-amzn-mqtt-ca")) + + Io::DefaultHostResolver defaultHostResolver(eventLoopGroup, 8, 30, appCtx.allocator); + if (!defaultHostResolver) { AWS_LOGF_ERROR( AWS_LS_MQTT5_CANARY, - "Failed to set alpn list with error %s", - aws_error_debug_str(tlsConnectionOptions.LastError())); + "Failed to create host resolver with error %s", + aws_error_debug_str(defaultHostResolver.LastError())); exit(1); } - } - /********************************************************** - * EVENT LOOP GROUP - **********************************************************/ + Aws::Crt::Io::ClientBootstrap clientBootstrap(eventLoopGroup, defaultHostResolver, allocator); - Io::SocketOptions socketOptions; - socketOptions.SetConnectTimeoutMs(appCtx.connect_timeout); - socketOptions.SetKeepAliveIntervalSec(10000); + if (!clientBootstrap) + { + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CANARY, + "Failed to create client bootstrap with error %s", + aws_error_debug_str(clientBootstrap.LastError())); + exit(1); + } - Io::EventLoopGroup eventLoopGroup(testerOptions.elgMaxThreads, appCtx.allocator); - if (!eventLoopGroup) - { - AWS_LOGF_ERROR( - AWS_LS_MQTT5_CANARY, - "Failed to create eventloop group with error %s", - aws_error_debug_str(eventLoopGroup.LastError())); - exit(1); - } + /********************************************************** + * MQTT5 CLIENT CREATION + **********************************************************/ + + uint16_t receive_maximum = 9; + uint32_t maximum_packet_size = 128 * 1024; + + std::shared_ptr packetConnect = std::make_shared(allocator); + packetConnect->WithKeepAliveIntervalSec(30) + .WithMaximumPacketSizeBytes(maximum_packet_size) + .WithReceiveMaximum(receive_maximum); + + Aws::Crt::String namestring((const char *)hostName.ptr, hostName.len); + Aws::Crt::Mqtt5::Mqtt5ClientOptions mqtt5Options(appCtx.allocator); + mqtt5Options.WithHostName(namestring) + .WithPort(appCtx.port) + .WithConnectOptions(packetConnect) + .WithSocketOptions(socketOptions) + .WithBootstrap(&clientBootstrap) + .WithPingTimeoutMs(10000) + .WithReconnectOptions({AWS_EXPONENTIAL_BACKOFF_JITTER_NONE, 1000, 120000, 3000}); + + if (appCtx.use_tls) + { + mqtt5Options.WithTlsConnectionOptions(tlsConnectionOptions); + } - Io::DefaultHostResolver defaultHostResolver(eventLoopGroup, 8, 30, appCtx.allocator); - if (!defaultHostResolver) - { - AWS_LOGF_ERROR( - AWS_LS_MQTT5_CANARY, - "Failed to create host resolver with error %s", - aws_error_debug_str(defaultHostResolver.LastError())); - exit(1); - } + if (appCtx.use_websockets) + { + mqtt5Options.WithWebsocketHandshakeTransformCallback(s_AwsMqtt5TransformWebsocketHandshakeFn); + } - Aws::Crt::Io::ClientBootstrap clientBootstrap(eventLoopGroup, defaultHostResolver, allocator); + std::vector clients; - if (!clientBootstrap) - { - AWS_LOGF_ERROR( - AWS_LS_MQTT5_CANARY, - "Failed to create client bootstrap with error %s", - aws_error_debug_str(clientBootstrap.LastError())); - exit(1); - } + uint64_t startTime = 0; + aws_high_res_clock_get_ticks(&startTime); + char sharedTopicArray[AWS_MQTT5_CANARY_TOPIC_ARRAY_SIZE]; + AWS_ZERO_STRUCT(sharedTopicArray); + snprintf(sharedTopicArray, sizeof sharedTopicArray, "%" PRId64 "_shared_topic", startTime); - /********************************************************** - * MQTT5 CLIENT CREATION - **********************************************************/ - - uint16_t receive_maximum = 9; - uint32_t maximum_packet_size = 128 * 1024; - - std::shared_ptr packetConnect = std::make_shared(allocator); - packetConnect->WithKeepAliveIntervalSec(30) - .WithMaximumPacketSizeBytes(maximum_packet_size) - .WithReceiveMaximum(receive_maximum); - - Aws::Crt::String namestring((const char *)hostName.ptr, hostName.len); - Aws::Crt::Mqtt5::Mqtt5ClientOptions mqtt5Options(appCtx.allocator); - mqtt5Options.WithHostName(namestring) - .WithPort(appCtx.port) - .WithConnectOptions(packetConnect) - .WithSocketOptions(socketOptions) - .WithBootstrap(&clientBootstrap) - .WithPingTimeoutMs(10000) - .WithReconnectOptions({AWS_EXPONENTIAL_BACKOFF_JITTER_NONE, 1000, 120000, 3000}); - - if (appCtx.use_tls) - { - mqtt5Options.WithTlsConnectionOptions(tlsConnectionOptions); - } + for (size_t i = 0; i < testerOptions.clientCount; ++i) + { + struct AwsMqtt5CanaryTestClient client; + client = {}; + Aws::Crt::UUID uuid; + client.clientId = String("TestClient") + std::to_string(i).c_str() + "_" + uuid.ToString(); + client.sharedTopic = Aws::Crt::String(sharedTopicArray); + client.isConnected = false; + clients.push_back(client); + mqtt5Options.WithAckTimeoutSeconds(10); + mqtt5Options.WithPublishReceivedCallback([&clients, i](const Mqtt5::PublishReceivedEventData &publishData) { + AWS_LOGF_INFO( + AWS_LS_MQTT5_CANARY, + "Client:%s Publish Received on topic %s", + clients[i].clientId.c_str(), + publishData.publishPacket->getTopic().c_str()); + }); - if (appCtx.use_websockets) - { - mqtt5Options.WithWebsocketHandshakeTransformCallback(s_AwsMqtt5TransformWebsocketHandshakeFn); - } + mqtt5Options.WithClientConnectionSuccessCallback( + [&clients, i](const Mqtt5::OnConnectionSuccessEventData &eventData) { + clients[i].isConnected = true; + clients[i].clientId = Aws::Crt::String( + eventData.negotiatedSettings->getClientId().c_str(), + eventData.negotiatedSettings->getClientId().length()); + clients[i].settings = eventData.negotiatedSettings; - std::vector clients; + AWS_LOGF_INFO( + AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Connection Success", clients[i].clientId.c_str()); + }); - uint64_t startTime = 0; - aws_high_res_clock_get_ticks(&startTime); - char sharedTopicArray[AWS_MQTT5_CANARY_TOPIC_ARRAY_SIZE]; - AWS_ZERO_STRUCT(sharedTopicArray); - snprintf(sharedTopicArray, sizeof sharedTopicArray, "%" PRId64 "_shared_topic", startTime); + mqtt5Options.WithClientConnectionFailureCallback([&clients, i](const OnConnectionFailureEventData &eventData) { + clients[i].isConnected = false; + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CANARY, + "ID:%s Connection failed with Error Code: %d(%s)", + clients[i].clientId.c_str(), + eventData.errorCode, + aws_error_debug_str(eventData.errorCode)); + }); - for (size_t i = 0; i < testerOptions.clientCount; ++i) - { - struct AwsMqtt5CanaryTestClient client; - client = {}; - Aws::Crt::UUID uuid; - client.clientId = String("TestClient") + std::to_string(i).c_str() + "_" + uuid.ToString(); - client.sharedTopic = Aws::Crt::String(sharedTopicArray); - client.isConnected = false; - clients.push_back(client); - mqtt5Options.WithAckTimeoutSeconds(10); - mqtt5Options.WithPublishReceivedCallback([&clients, i](const Mqtt5::PublishReceivedEventData &publishData) { - AWS_LOGF_INFO( - AWS_LS_MQTT5_CANARY, - "Client:%s Publish Received on topic %s", - clients[i].clientId.c_str(), - publishData.publishPacket->getTopic().c_str()); - }); - - mqtt5Options.WithClientConnectionSuccessCallback( - [&clients, i](const Mqtt5::OnConnectionSuccessEventData &eventData) { - clients[i].isConnected = true; - clients[i].clientId = Aws::Crt::String( - eventData.negotiatedSettings->getClientId().c_str(), - eventData.negotiatedSettings->getClientId().length()); - clients[i].settings = eventData.negotiatedSettings; + mqtt5Options.WithClientDisconnectionCallback([&clients, i](const OnDisconnectionEventData &) { + clients[i].isConnected = false; + AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Disconnect", clients[i].clientId.c_str()); + }); - AWS_LOGF_INFO( - AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Connection Success", clients[i].clientId.c_str()); + mqtt5Options.WithClientStoppedCallback([&clients, i](const OnStoppedEventData &) { + AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Stopped", clients[i].clientId.c_str()); }); - mqtt5Options.WithClientConnectionFailureCallback([&clients, i](const OnConnectionFailureEventData &eventData) { - clients[i].isConnected = false; - AWS_LOGF_ERROR( - AWS_LS_MQTT5_CANARY, - "ID:%s Connection failed with Error Code: %d(%s)", - clients[i].clientId.c_str(), - eventData.errorCode, - aws_error_debug_str(eventData.errorCode)); - }); - - mqtt5Options.WithClientDisconnectionCallback([&clients, i](const OnDisconnectionEventData &) { - clients[i].isConnected = false; - AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Disconnect", clients[i].clientId.c_str()); - }); - - mqtt5Options.WithClientStoppedCallback([&clients, i](const OnStoppedEventData &) { - AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Stopped", clients[i].clientId.c_str()); - }); - - clients[i].client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, appCtx.allocator); - if (clients[i].client == nullptr) - { - AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Client Creation Failed.", client.clientId.c_str()); - continue; - } + clients[i].client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, appCtx.allocator); + if (clients[i].client == nullptr) + { + AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Client Creation Failed.", client.clientId.c_str()); + continue; + } - awsMqtt5CanaryOperationFn *operation_fn = - s_AwsMqtt5CanaryOperationTable.operationByOperationType[AWS_MQTT5_CANARY_OPERATION_START]; - if ((*operation_fn)(&clients[i], appCtx.allocator) == AWS_OP_ERR) - { - AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Operation Failed.", client.clientId.c_str()); + awsMqtt5CanaryOperationFn *operation_fn = + s_AwsMqtt5CanaryOperationTable.operationByOperationType[AWS_MQTT5_CANARY_OPERATION_START]; + if ((*operation_fn)(&clients[i], appCtx.allocator) == AWS_OP_ERR) + { + AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Operation Failed.", client.clientId.c_str()); + } + + aws_thread_current_sleep(AWS_MQTT5_CANARY_CLIENT_CREATION_SLEEP_TIME); } - aws_thread_current_sleep(AWS_MQTT5_CANARY_CLIENT_CREATION_SLEEP_TIME); - } + fprintf(stderr, "Clients created\n"); - fprintf(stderr, "Clients created\n"); + /********************************************************** + * TESTING + **********************************************************/ + bool done = false; + size_t operationsExecuted = 0; + uint64_t timeTestFinish = 0; + aws_high_res_clock_get_ticks(&timeTestFinish); + timeTestFinish += + aws_timestamp_convert(testerOptions.testRunSeconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + uint64_t timeInterval = + aws_timestamp_convert(testerOptions.memoryCheckIntervalSec, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + uint64_t memoryCheckPoint = 0; - /********************************************************** - * TESTING - **********************************************************/ - bool done = false; - size_t operationsExecuted = 0; - uint64_t timeTestFinish = 0; - aws_high_res_clock_get_ticks(&timeTestFinish); - timeTestFinish += - aws_timestamp_convert(testerOptions.testRunSeconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); - uint64_t timeInterval = - aws_timestamp_convert(testerOptions.memoryCheckIntervalSec, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); - uint64_t memoryCheckPoint = 0; + printf("Running test for %zu seconds\n", testerOptions.testRunSeconds); - printf("Running test for %zu seconds\n", testerOptions.testRunSeconds); + while (!done) + { + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + operationsExecuted++; - while (!done) - { - uint64_t now = 0; - aws_high_res_clock_get_ticks(&now); - operationsExecuted++; + AwsMqtt5CanaryOperations nextOperation = s_AwsMqtt5CanaryGetRandomOperation(&testerOptions); + awsMqtt5CanaryOperationFn *operation_fn = + s_AwsMqtt5CanaryOperationTable.operationByOperationType[nextOperation]; - AwsMqtt5CanaryOperations nextOperation = s_AwsMqtt5CanaryGetRandomOperation(&testerOptions); - awsMqtt5CanaryOperationFn *operation_fn = - s_AwsMqtt5CanaryOperationTable.operationByOperationType[nextOperation]; + (*operation_fn)(&clients[rand() % clients.size()], appCtx.allocator); + + if (now > timeTestFinish) + { + printf(" Operating TPS average over test: %zu\n\n", operationsExecuted / testerOptions.testRunSeconds); + done = true; + } - (*operation_fn)(&clients[rand() % clients.size()], appCtx.allocator); + if (now > memoryCheckPoint) + { + const size_t outstanding_bytes = aws_mem_tracer_bytes(allocator); + printf("Summary:\n"); + printf(" Outstanding bytes: %zu\n", outstanding_bytes); + printf(" Operations executed: %zu\n", operationsExecuted); + memoryCheckPoint = now + timeInterval; + } - if (now > timeTestFinish) - { - printf(" Operating TPS average over test: %zu\n\n", operationsExecuted / testerOptions.testRunSeconds); - done = true; + aws_thread_current_sleep(testerOptions.tpsSleepTime); } + /********************************************************** + * CLEAN UP + **********************************************************/ - if (now > memoryCheckPoint) + for (auto client : clients) { - const size_t outstanding_bytes = aws_mem_tracer_bytes(allocator); - printf("Summary:\n"); - printf(" Outstanding bytes: %zu\n", outstanding_bytes); - printf(" Operations executed: %zu\n", operationsExecuted); - memoryCheckPoint = now + timeInterval; + awsMqtt5CanaryOperationFn *operation_fn = + s_AwsMqtt5CanaryOperationTable.operationByOperationType[AWS_MQTT5_CANARY_OPERATION_STOP]; + if ((*operation_fn)(&client, appCtx.allocator) == AWS_OP_ERR) + { + AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s STOP Operation Failed.", client.clientId.c_str()); + } } - - aws_thread_current_sleep(testerOptions.tpsSleepTime); } - /********************************************************** - * CLEAN UP - **********************************************************/ - for (auto client : clients) - { - awsMqtt5CanaryOperationFn *operation_fn = - s_AwsMqtt5CanaryOperationTable.operationByOperationType[AWS_MQTT5_CANARY_OPERATION_STOP]; - if ((*operation_fn)(&client, appCtx.allocator) == AWS_OP_ERR) - { - AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s STOP Operation Failed.", client.clientId.c_str()); - } - } + aws_mem_tracer_destroy(allocator); return 0; }