From 5f2268cd7c0d5b08b169a60094ff4cc66214d4eb Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 16 Sep 2024 12:26:32 -0700 Subject: [PATCH] Checkpoint --- include/aws/crt/mqtt/MqttConnection.h | 2 +- .../aws/crt/mqtt/private/MqttConnectionCore.h | 2 +- include/aws/iot/MqttRequestResponseClient.h | 320 ++++---- source/iot/MqttRequestResponseClient.cpp | 742 ++++++++++++------ source/mqtt/Mqtt5Client.cpp | 3 +- source/mqtt/MqttConnection.cpp | 2 +- source/mqtt/MqttConnectionCore.cpp | 2 +- 7 files changed, 652 insertions(+), 421 deletions(-) diff --git a/include/aws/crt/mqtt/MqttConnection.h b/include/aws/crt/mqtt/MqttConnection.h index 670dad35d..cd8049425 100644 --- a/include/aws/crt/mqtt/MqttConnection.h +++ b/include/aws/crt/mqtt/MqttConnection.h @@ -263,7 +263,7 @@ namespace Aws bool Disconnect() noexcept; /// @private - aws_mqtt_client_connection *GetUnderlyingConnection() noexcept; + aws_mqtt_client_connection *GetUnderlyingConnection() const noexcept; /** * Subscribes to topicFilter. OnMessageReceivedHandler will be invoked from an event-loop diff --git a/include/aws/crt/mqtt/private/MqttConnectionCore.h b/include/aws/crt/mqtt/private/MqttConnectionCore.h index 8b0d503f8..827cc9db4 100644 --- a/include/aws/crt/mqtt/private/MqttConnectionCore.h +++ b/include/aws/crt/mqtt/private/MqttConnectionCore.h @@ -151,7 +151,7 @@ namespace Aws bool Disconnect() noexcept; /// @private - aws_mqtt_client_connection *GetUnderlyingConnection() noexcept; + aws_mqtt_client_connection *GetUnderlyingConnection() const noexcept; /** * @internal diff --git a/include/aws/iot/MqttRequestResponseClient.h b/include/aws/iot/MqttRequestResponseClient.h index ecd10f850..b994300b5 100644 --- a/include/aws/iot/MqttRequestResponseClient.h +++ b/include/aws/iot/MqttRequestResponseClient.h @@ -15,217 +15,217 @@ #include -namespace Aws { - -namespace Crt { -namespace Mqtt { - class MqttConnection; -} - -namespace Mqtt5 { - class Mqtt5Client; -} -} - -namespace Iot { -namespace RequestResponse +namespace Aws { - class MqttRequestResponseClientImpl; - - /** - * The type of change to the state of a streaming operation subscription - */ - enum class SubscriptionStatusEventType - { - - /** - * The streaming operation is successfully subscribed to its topic (filter) - */ - SubscriptionEstablished = ARRSSET_SUBSCRIPTION_ESTABLISHED, - - /** - * The streaming operation has temporarily lost its subscription to its topic (filter) - */ - SubscriptionLost = ARRSSET_SUBSCRIPTION_LOST, - - /** - * The streaming operation has entered a terminal state where it has given up trying to subscribe - * to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy). - */ - SubscriptionHalted = ARRSSET_SUBSCRIPTION_HALTED, - }; - - /** - * An event that describes a change in subscription status for a streaming operation. - */ - struct AWS_CRT_CPP_API SubscriptionStatusEvent + namespace Crt { - SubscriptionStatusEventType type; - int errorCode; - }; - - using SubscriptionStatusEventHandler = std::function; - - struct AWS_CRT_CPP_API IncomingPublishEvent { - Aws::Crt::ByteCursor payload; - }; - - using IncomingPublishEventHandler = std::function; + namespace Mqtt + { + class MqttConnection; + } - /** - * Encapsulates a response to an AWS IoT Core MQTT-based service request - */ - struct AWS_CRT_CPP_API Response { + namespace Mqtt5 + { + class Mqtt5Client; + } + } // namespace Crt - /** - * MQTT Topic that the response was received on. Different topics map to different types within the - * service model, so we need this value in order to know what to deserialize the payload into. - */ - Aws::Crt::ByteCursor topic; + namespace Iot + { + namespace RequestResponse + { - /** - * Payload of the response that correlates to a submitted request. - */ - Aws::Crt::ByteCursor payload; - }; + class MqttRequestResponseClientImpl; - template struct Result { - public: + /** + * The type of change to the state of a streaming operation subscription + */ + enum class SubscriptionStatusEventType + { - Result() = delete; - Result(const Result &result) = default; - Result(Result &&result) = default; + /** + * The streaming operation is successfully subscribed to its topic (filter) + */ + SubscriptionEstablished = ARRSSET_SUBSCRIPTION_ESTABLISHED, - Result(const R &response) : - rawResult(response) - {} + /** + * The streaming operation has temporarily lost its subscription to its topic (filter) + */ + SubscriptionLost = ARRSSET_SUBSCRIPTION_LOST, - Result(R &&response) : - rawResult(std::move(response)) - {} + /** + * The streaming operation has entered a terminal state where it has given up trying to subscribe + * to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission + * policy). + */ + SubscriptionHalted = ARRSSET_SUBSCRIPTION_HALTED, + }; - Result(const E &error) : - rawResult(error) - {} + /** + * An event that describes a change in subscription status for a streaming operation. + */ + struct AWS_CRT_CPP_API SubscriptionStatusEvent + { + SubscriptionStatusEventType type; + int errorCode; + }; - Result(E &&error) : - rawResult(std::move(error)) - {} + using SubscriptionStatusEventHandler = std::function; - ~Result() = default; + struct AWS_CRT_CPP_API IncomingPublishEvent + { + Aws::Crt::ByteCursor payload; + }; - Result &operator=(const Result &result) = default; - Result &operator=(Result &&result) = default; + using IncomingPublishEventHandler = std::function; - Result &operator=(const R &response) { - this->rawResult = response; + /** + * Encapsulates a response to an AWS IoT Core MQTT-based service request + */ + struct AWS_CRT_CPP_API UnmodeledResponse + { - return *this; - } + /** + * MQTT Topic that the response was received on. Different topics map to different types within the + * service model, so we need this value in order to know what to deserialize the payload into. + */ + Aws::Crt::ByteCursor topic; - Result &operator=(R &&response) { - this->rawResult = std::move(response); + /** + * Payload of the response that correlates to a submitted request. + */ + Aws::Crt::ByteCursor payload; + }; - return *this; - } + template struct Result + { + public: + Result() = delete; + Result(const Result &result) = default; + Result(Result &&result) = default; - Result &operator=(const E &error) { - this->rawResult = error; - } + Result(const R &response) : rawResult(response) {} - Result &operator=(E &&error) { - this->rawResult = std::move(error); + Result(R &&response) : rawResult(std::move(response)) {} - return *this; - } + Result(const E &error) : rawResult(error) {} - bool isSuccess() const { - return rawResult.holds_alternative(); - } + Result(E &&error) : rawResult(std::move(error)) {} - const R &getResponse() const { - AWS_FATAL_ASSERT(isSuccess()); + ~Result() = default; - return rawResult.get(); - } + Result &operator=(const Result &result) = default; + Result &operator=(Result &&result) = default; - const E &getError() const { - AWS_FATAL_ASSERT(!isSuccess()); + Result &operator=(const R &response) + { + this->rawResult = response; - return rawResult.get(); - } + return *this; + } - private: + Result &operator=(R &&response) + { + this->rawResult = std::move(response); - Aws::Crt::Variant rawResult; - }; + return *this; + } - using UnmodeledResult = Result; + Result &operator=(const E &error) { this->rawResult = error; } - using UnmodeledResultHandler = std::function; + Result &operator=(E &&error) + { + this->rawResult = std::move(error); - struct AWS_CRT_CPP_API StreamingOperationOptions { - Aws::Crt::ByteCursor subscriptionTopicFilter; + return *this; + } - SubscriptionStatusEventHandler subscriptionStatusEventHandler; + bool isSuccess() const { return rawResult.template holds_alternative(); } - IncomingPublishEventHandler incomingPublishEventHandler; - }; + const R &getResponse() const + { + AWS_FATAL_ASSERT(isSuccess()); - class AWS_CRT_CPP_API IStreamingOperation { - public: + return rawResult.template get(); + } - virtual ~IStreamingOperation() = 0; + const E &getError() const + { + AWS_FATAL_ASSERT(!isSuccess()); - }; + return rawResult.template get(); + } - /** - * MQTT-based request-response client configuration options - */ - struct AWS_CRT_CPP_API RequestResponseClientOptions { + private: + Aws::Crt::Variant rawResult; + }; - /** - * Maximum number of subscriptions that the client will concurrently use for request-response operations - */ - uint32_t maxRequestResponseSubscriptions; + using UnmodeledResult = Result; - /** - * Maximum number of subscriptions that the client will concurrently use for streaming operations - */ - uint32_t maxStreamingSubscriptions; + using UnmodeledResultHandler = std::function; - /** - * Duration, in seconds, that a request-response operation will wait for completion before giving up - */ - uint32_t operationTimeoutInSeconds; - }; + struct AWS_CRT_CPP_API StreamingOperationOptions + { + Aws::Crt::ByteCursor subscriptionTopicFilter; + SubscriptionStatusEventHandler subscriptionStatusEventHandler; + IncomingPublishEventHandler incomingPublishEventHandler; + }; - class AWS_CRT_CPP_API MqttRequestResponseClient { - public: + class AWS_CRT_CPP_API IStreamingOperation + { + public: + virtual ~IStreamingOperation() = 0; - virtual ~MqttRequestResponseClient(); + virtual void activate() = 0; + }; - static MqttRequestResponseClient *newFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); + /** + * MQTT-based request-response client configuration options + */ + struct AWS_CRT_CPP_API RequestResponseClientOptions + { - static MqttRequestResponseClient *newFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); + /** + * Maximum number of subscriptions that the client will concurrently use for request-response operations + */ + uint32_t maxRequestResponseSubscriptions; - int submitRequest(const aws_mqtt_request_operation_options &requestOptions, UnmodeledResultHandler &&resultHandler); + /** + * Maximum number of subscriptions that the client will concurrently use for streaming operations + */ + uint32_t maxStreamingSubscriptions; - std::shared_ptr createStream(StreamingOperationOptions &&options); + /** + * Duration, in seconds, that a request-response operation will wait for completion before giving up + */ + uint32_t operationTimeoutInSeconds; + }; - private: + class AWS_CRT_CPP_API IMqttRequestResponseClient + { + public: + virtual ~IMqttRequestResponseClient() = 0; - MqttRequestResponseClient(Aws::Crt::Allocator *allocator, MqttRequestResponseClientImpl *impl); + virtual int submitRequest( + const aws_mqtt_request_operation_options &requestOptions, + UnmodeledResultHandler &&resultHandler) = 0; - Aws::Crt::Allocator *m_allocator; + virtual std::shared_ptr createStream(StreamingOperationOptions &&options) = 0; - MqttRequestResponseClientImpl *m_impl; - }; + static IMqttRequestResponseClient *newFrom5( + const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, + RequestResponseClientOptions &&options, + Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); + static IMqttRequestResponseClient *newFrom311( + const Aws::Crt::Mqtt::MqttConnection &protocolClient, + RequestResponseClientOptions &&options, + Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); + }; -} // RequestResponse -} // Iot -} // Aws \ No newline at end of file + } // namespace RequestResponse + } // namespace Iot +} // namespace Aws \ No newline at end of file diff --git a/source/iot/MqttRequestResponseClient.cpp b/source/iot/MqttRequestResponseClient.cpp index dadd54f54..865fb3acb 100644 --- a/source/iot/MqttRequestResponseClient.cpp +++ b/source/iot/MqttRequestResponseClient.cpp @@ -1,7 +1,7 @@ /** -* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -* SPDX-License-Identifier: Apache-2.0. -*/ + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ #include @@ -13,257 +13,487 @@ namespace Aws { -namespace Iot -{ -namespace RequestResponse -{ - - class AWS_CRT_CPP_API StreamingOperationImpl { - public: - StreamingOperationImpl(StreamingOperationOptions &&options, struct aws_event_loop *protocolLoop); - virtual ~StreamingOperationImpl(); - - void seatStream(struct aws_mqtt_rr_client_operation *stream); - - void close(); - - static void onIncomingPublish(StreamingOperationImpl *impl); - static void onSubscriptionStatusEvent(StreamingOperationImpl *impl); - - private: - - Aws::Crt::Allocator *m_allocator; - - StreamingOperationOptions m_Config; - - struct aws_mqtt_rr_client_operation *m_stream; - - struct aws_event_loop *m_protocolLoop; - - struct aws_rw_lock m_lock; - - bool m_closed; - }; - - StreamingOperationImpl::StreamingOperationImpl(StreamingOperationOptions &&options, struct aws_event_loop *protocolLoop) { - - } - - StreamingOperationImpl::~StreamingOperationImpl() { - - } - - void StreamingOperationImpl::seatStream(struct aws_mqtt_rr_client_operation *stream) { - m_stream = stream; - } - - void StreamingOperationImpl::close() { - aws_mqtt_rr_client_operation_release(m_stream); - } - - void StreamingOperationImpl::onIncomingPublish(StreamingOperationImpl *impl) { - - } - - void StreamingOperationImpl::onSubscriptionStatusEvent(StreamingOperationImpl *impl) { - - } - - ////////////////////////////////////////////////////////// - - class StreamingOperation : public IStreamingOperation { - public: - - StreamingOperation(StreamingOperationOptions &&options, struct aws_event_loop *protocolLoop); - virtual ~StreamingOperation(); - - private: - - std::shared_ptr m_impl; - }; - - - StreamingOperation::StreamingOperation(StreamingOperationOptions &&options, struct aws_event_loop *protocolLoop) { - - } - - StreamingOperation::~StreamingOperation() { - m_impl->close(); - } - - ////////////////////////////////////////////////////////// - - struct IncompleteRequest - { - struct aws_allocator *m_allocator; - - UnmodeledResultHandler m_handler; - }; - - static void s_completeRequestWithError(struct IncompleteRequest *incompleteRequest, int errorCode) { - UnmodeledResult result(errorCode); - incompleteRequest->m_handler(std::move(result)); - } - - static void s_completeRequestWithSuccess(struct IncompleteRequest *incompleteRequest, const struct aws_byte_cursor *response_topic, - const struct aws_byte_cursor *payload) { - Response response; - response.topic = *response_topic; - response.payload = *payload; - - UnmodeledResult result(response); - incompleteRequest->m_handler(std::move(result)); - } - - static void s_onRequestComplete(const struct aws_byte_cursor *response_topic, - const struct aws_byte_cursor *payload, - int error_code, - void *user_data) { - struct IncompleteRequest *incompleteRequest = static_cast(user_data); - - if (error_code != AWS_ERROR_SUCCESS) { - s_completeRequestWithError(incompleteRequest, error_code); - } else { - s_completeRequestWithSuccess(incompleteRequest, response_topic, payload); - } - - Aws::Crt::Delete(incompleteRequest, incompleteRequest->m_allocator); - } - - class AWS_CRT_CPP_API MqttRequestResponseClientImpl - { - public: - MqttRequestResponseClientImpl(Aws::Crt::Allocator *allocator) noexcept; - ~MqttRequestResponseClientImpl(); - - void seatClient(struct aws_mqtt_request_response_client *client); - - void close() noexcept; - - int submitRequest( - const aws_mqtt_request_operation_options &requestOptions, - UnmodeledResultHandler &&resultHandler) noexcept; - - std::shared_ptr createStream(StreamingOperationOptions &&options); - - Aws::Crt::Allocator *getAllocator() const { return m_allocator; } - - private: - - Aws::Crt::Allocator *m_allocator; - - struct aws_mqtt_request_response_client *m_client; - }; - - MqttRequestResponseClientImpl::MqttRequestResponseClientImpl(Aws::Crt::Allocator *allocator - ) noexcept : - m_allocator(allocator), - m_client(nullptr) + namespace Iot { - } - - MqttRequestResponseClientImpl::~MqttRequestResponseClientImpl() { - AWS_FATAL_ASSERT(m_client == nullptr); - } - - void MqttRequestResponseClientImpl::seatClient(struct aws_mqtt_request_response_client *client) { - m_client = client; - } - - void MqttRequestResponseClientImpl::close() noexcept { - aws_mqtt_request_response_client_release(m_client); - m_client = nullptr; - } - - int MqttRequestResponseClientImpl::submitRequest(const aws_mqtt_request_operation_options &requestOptions, UnmodeledResultHandler &&resultHandler) noexcept - { - IncompleteRequest *incompleteRequest = Aws::Crt::New(m_allocator); - incompleteRequest->m_allocator = m_allocator; - incompleteRequest->m_handler = std::move(resultHandler); - - struct aws_mqtt_request_operation_options rawOptions = requestOptions; - rawOptions.completion_callback = s_onRequestComplete; - rawOptions.user_data = incompleteRequest; - - int result = aws_mqtt_request_response_client_submit_request(m_client, &rawOptions); - if (result) { - Aws::Crt::Delete(incompleteRequest, incompleteRequest->m_allocator); - } - - return result; - } - - std::shared_ptr MqttRequestResponseClientImpl::createStream(StreamingOperationOptions &&options) { - return nullptr; - } - - ////////////////////////////////////////////////////////// - - static void s_onClientTermination(void *user_data) { - auto *impl = static_cast(user_data); - - Aws::Crt::Delete(impl, impl->getAllocator()); - } - - MqttRequestResponseClient *MqttRequestResponseClient::newFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator) { - auto clientImpl = Aws::Crt::New(allocator, allocator); - - struct aws_mqtt_request_response_client_options rrClientOptions; - AWS_ZERO_STRUCT(rrClientOptions); - rrClientOptions.max_request_response_subscriptions = options.maxRequestResponseSubscriptions; - rrClientOptions.max_streaming_subscriptions = options.maxStreamingSubscriptions; - rrClientOptions.operation_timeout_seconds = options.operationTimeoutInSeconds; - rrClientOptions.terminated_callback = s_onClientTermination; - rrClientOptions.user_data = clientImpl; - - struct aws_mqtt_request_response_client *rrClient = aws_mqtt_request_response_client_new_from_mqtt5_client(allocator, protocolClient.GetUnderlyingHandle(), &rrClientOptions); - clientImpl->seatClient(rrClient); - - // Can't use Aws::Crt::New because constructor is private and I don't want to change that - MqttRequestResponseClient *client = reinterpret_cast(aws_mem_acquire(allocator, sizeof(MqttRequestResponseClient))); - - return new (client) MqttRequestResponseClient(allocator, clientImpl); - } - - - MqttRequestResponseClient *MqttRequestResponseClient::newFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator) { - auto clientImpl = Aws::Crt::New(allocator, allocator); - - struct aws_mqtt_request_response_client_options rrClientOptions; - AWS_ZERO_STRUCT(rrClientOptions); - rrClientOptions.max_request_response_subscriptions = options.maxRequestResponseSubscriptions; - rrClientOptions.max_streaming_subscriptions = options.maxStreamingSubscriptions; - rrClientOptions.operation_timeout_seconds = options.operationTimeoutInSeconds; - rrClientOptions.terminated_callback = s_onClientTermination; - rrClientOptions.user_data = clientImpl; - - struct aws_mqtt_request_response_client *rrClient = aws_mqtt_request_response_client_new_from_mqtt311_client(allocator, protocolClient.??(), &rrClientOptions); - clientImpl->seatClient(rrClient); - - // Can't use Aws::Crt::New because constructor is private and I don't want to change that - MqttRequestResponseClient *client = reinterpret_cast(aws_mem_acquire(allocator, sizeof(MqttRequestResponseClient))); - - return new (client) MqttRequestResponseClient(allocator, clientImpl); - } - - int MqttRequestResponseClient::submitRequest(const aws_mqtt_request_operation_options &requestOptions, UnmodeledResultHandler &&resultHandler) { - return m_impl->submitRequest(requestOptions, std::move(resultHandler)); - } - - std::shared_ptr MqttRequestResponseClient::createStream(StreamingOperationOptions &&options) { - return m_impl->createStream(std::move(options)); - } - - MqttRequestResponseClient::MqttRequestResponseClient(Aws::Crt::Allocator *allocator, MqttRequestResponseClientImpl *impl) : - m_allocator(allocator), - m_impl(impl) - { - } - - MqttRequestResponseClient::~MqttRequestResponseClient() { - m_impl->close(); - } -} -} -} - + namespace RequestResponse + { + + class StreamReadLock + { + public: + StreamReadLock(struct aws_rw_lock *lock, struct aws_event_loop *protocolLoop) + : m_lock(lock), m_taken(false) + { + if (!aws_event_loop_thread_is_callers_thread(protocolLoop)) + { + m_taken = true; + aws_rw_lock_rlock(lock); + } + } + + ~StreamReadLock() + { + if (m_taken) + { + aws_rw_lock_runlock(m_lock); + } + } + + private: + struct aws_rw_lock *m_lock; + + bool m_taken; + }; + + class StreamWriteLock + { + public: + StreamWriteLock(struct aws_rw_lock *lock, struct aws_event_loop *protocolLoop) + : m_lock(lock), m_taken(false) + { + if (!aws_event_loop_thread_is_callers_thread(protocolLoop)) + { + m_taken = true; + aws_rw_lock_wlock(lock); + } + } + + ~StreamWriteLock() + { + if (m_taken) + { + aws_rw_lock_wunlock(m_lock); + } + } + + private: + struct aws_rw_lock *m_lock; + + bool m_taken; + }; + + class AWS_CRT_CPP_API StreamingOperationImpl + { + public: + StreamingOperationImpl( + struct aws_mqtt_rr_client_operation *stream, + StreamingOperationOptions &&options, + struct aws_event_loop *protocolLoop); + virtual ~StreamingOperationImpl(); + + void activate(); + + void close(); + + static void onSubscriptionStatusCallback( + enum aws_rr_streaming_subscription_event_type status, + int error_code, + void *user_data); + static void onIncomingPublishCallback(struct aws_byte_cursor payload, void *user_data); + static void onTerminatedCallback(void *user_data); + + private: + StreamingOperationOptions m_config; + + struct aws_mqtt_rr_client_operation *m_stream; + + struct aws_event_loop *m_protocolLoop; + + struct aws_rw_lock m_lock; + + bool m_closed; + }; + + struct StreamingOperationImplHandle + { + Aws::Crt::Allocator *m_allocator; + + std::shared_ptr m_impl; + }; + + StreamingOperationImpl::StreamingOperationImpl( + struct aws_mqtt_rr_client_operation *stream, + StreamingOperationOptions &&options, + struct aws_event_loop *protocolLoop) + : m_config(std::move(options)), m_stream(stream), m_protocolLoop(protocolLoop), m_lock(), + m_closed(false) + { + aws_rw_lock_init(&m_lock); + } + + StreamingOperationImpl::~StreamingOperationImpl() + { + AWS_FATAL_ASSERT(m_stream == nullptr); + AWS_FATAL_ASSERT(m_closed); + + aws_rw_lock_clean_up(&m_lock); + } + + void StreamingOperationImpl::activate() + { + { + StreamReadLock rlock(&m_lock, m_protocolLoop); + + if (!m_closed) + { + aws_mqtt_rr_client_operation_activate(m_stream); + } + } + } + + void StreamingOperationImpl::close() + { + struct aws_mqtt_rr_client_operation *toRelease = nullptr; + + { + StreamWriteLock wlock(&m_lock, m_protocolLoop); + + if (!m_closed) + { + m_closed = true; + toRelease = m_stream; + m_stream = NULL; + } + } + + if (toRelease) + { + aws_mqtt_rr_client_operation_release(toRelease); + } + } + + void StreamingOperationImpl::onSubscriptionStatusCallback( + enum aws_rr_streaming_subscription_event_type status, + int error_code, + void *user_data) + { + + StreamingOperationImplHandle *handle = static_cast(user_data); + StreamingOperationImpl *impl = handle->m_impl.get(); + + { + StreamReadLock readLock(&impl->m_lock, impl->m_protocolLoop); + + if (!impl->m_closed) + { + SubscriptionStatusEvent event; + event.type = SubscriptionStatusEventType(status); + event.errorCode = error_code; + + impl->m_config.subscriptionStatusEventHandler(std::move(event)); + } + } + } + + void StreamingOperationImpl::onIncomingPublishCallback(struct aws_byte_cursor payload, void *user_data) + { + StreamingOperationImplHandle *handle = static_cast(user_data); + StreamingOperationImpl *impl = handle->m_impl.get(); + + { + StreamReadLock readLock(&impl->m_lock, impl->m_protocolLoop); + + if (!impl->m_closed) + { + IncomingPublishEvent event; + event.payload = payload; + + impl->m_config.incomingPublishEventHandler(std::move(event)); + } + } + } + + void StreamingOperationImpl::onTerminatedCallback(void *user_data) + { + StreamingOperationImplHandle *handle = static_cast(user_data); + + Aws::Crt::Delete(handle, handle->m_allocator); + } + + ////////////////////////////////////////////////////////// + + class StreamingOperation : public IStreamingOperation + { + public: + static std::shared_ptr create( + Aws::Crt::Allocator *allocator, + StreamingOperationOptions &&options, + struct aws_mqtt_request_response_client *client); + + StreamingOperation(const std::shared_ptr &impl); + virtual ~StreamingOperation(); + + virtual void activate(); + + private: + std::shared_ptr m_impl; + }; + + StreamingOperation::StreamingOperation(const std::shared_ptr &impl) : m_impl(impl) + { + } + + std::shared_ptr StreamingOperation::create( + Aws::Crt::Allocator *allocator, + StreamingOperationOptions &&options, + struct aws_mqtt_request_response_client *client) + { + StreamingOperationImplHandle *implHandle = Aws::Crt::New(allocator); + + struct aws_mqtt_streaming_operation_options streamingOptions; + AWS_ZERO_STRUCT(streamingOptions); + streamingOptions.topic_filter = options.subscriptionTopicFilter; + streamingOptions.subscription_status_callback = StreamingOperationImpl::onSubscriptionStatusCallback; + streamingOptions.incoming_publish_callback = StreamingOperationImpl::onIncomingPublishCallback; + streamingOptions.terminated_callback = StreamingOperationImpl::onTerminatedCallback; + streamingOptions.user_data = implHandle; + + struct aws_mqtt_rr_client_operation *stream = + aws_mqtt_request_response_client_create_streaming_operation(client, &streamingOptions); + if (!stream) + { + return nullptr; + } + + auto impl = Aws::Crt::MakeShared( + allocator, stream, std::move(options), aws_mqtt_request_response_client_get_event_loop(client)); + auto streamingOperation = Aws::Crt::MakeShared(allocator, impl); + + return streamingOperation; + } + + StreamingOperation::~StreamingOperation() + { + m_impl->close(); + } + + void StreamingOperation::activate() + { + m_impl->activate(); + } + + ////////////////////////////////////////////////////////// + + struct IncompleteRequest + { + struct aws_allocator *m_allocator; + + UnmodeledResultHandler m_handler; + }; + + static void s_completeRequestWithError(struct IncompleteRequest *incompleteRequest, int errorCode) + { + UnmodeledResult result(errorCode); + incompleteRequest->m_handler(std::move(result)); + } + + static void s_completeRequestWithSuccess( + struct IncompleteRequest *incompleteRequest, + const struct aws_byte_cursor *response_topic, + const struct aws_byte_cursor *payload) + { + UnmodeledResponse response; + response.topic = *response_topic; + response.payload = *payload; + + UnmodeledResult result(response); + incompleteRequest->m_handler(std::move(result)); + } + + static void s_onRequestComplete( + const struct aws_byte_cursor *response_topic, + const struct aws_byte_cursor *payload, + int error_code, + void *user_data) + { + struct IncompleteRequest *incompleteRequest = static_cast(user_data); + + if (error_code != AWS_ERROR_SUCCESS) + { + s_completeRequestWithError(incompleteRequest, error_code); + } + else + { + s_completeRequestWithSuccess(incompleteRequest, response_topic, payload); + } + + Aws::Crt::Delete(incompleteRequest, incompleteRequest->m_allocator); + } + + class AWS_CRT_CPP_API MqttRequestResponseClientImpl + { + public: + MqttRequestResponseClientImpl(Aws::Crt::Allocator *allocator) noexcept; + ~MqttRequestResponseClientImpl(); + + void seatClient(struct aws_mqtt_request_response_client *client); + + void close() noexcept; + + int submitRequest( + const aws_mqtt_request_operation_options &requestOptions, + UnmodeledResultHandler &&resultHandler) noexcept; + + std::shared_ptr createStream(StreamingOperationOptions &&options); + + Aws::Crt::Allocator *getAllocator() const { return m_allocator; } + + private: + Aws::Crt::Allocator *m_allocator; + + struct aws_mqtt_request_response_client *m_client; + }; + + MqttRequestResponseClientImpl::MqttRequestResponseClientImpl(Aws::Crt::Allocator *allocator) noexcept + : m_allocator(allocator), m_client(nullptr) + { + } + + MqttRequestResponseClientImpl::~MqttRequestResponseClientImpl() + { + AWS_FATAL_ASSERT(m_client == nullptr); + } + + void MqttRequestResponseClientImpl::seatClient(struct aws_mqtt_request_response_client *client) + { + m_client = client; + } + + void MqttRequestResponseClientImpl::close() noexcept + { + aws_mqtt_request_response_client_release(m_client); + m_client = nullptr; + } + + int MqttRequestResponseClientImpl::submitRequest( + const aws_mqtt_request_operation_options &requestOptions, + UnmodeledResultHandler &&resultHandler) noexcept + { + IncompleteRequest *incompleteRequest = Aws::Crt::New(m_allocator); + incompleteRequest->m_allocator = m_allocator; + incompleteRequest->m_handler = std::move(resultHandler); + + struct aws_mqtt_request_operation_options rawOptions = requestOptions; + rawOptions.completion_callback = s_onRequestComplete; + rawOptions.user_data = incompleteRequest; + + int result = aws_mqtt_request_response_client_submit_request(m_client, &rawOptions); + if (result) + { + Aws::Crt::Delete(incompleteRequest, incompleteRequest->m_allocator); + } + + return result; + } + + std::shared_ptr MqttRequestResponseClientImpl::createStream( + StreamingOperationOptions &&options) + { + return StreamingOperation::create(m_allocator, std::move(options), m_client); + } + + ////////////////////////////////////////////////////////// + + static void s_onClientTermination(void *user_data) + { + auto *impl = static_cast(user_data); + + Aws::Crt::Delete(impl, impl->getAllocator()); + } + + class MqttRequestResponseClient : public IMqttRequestResponseClient + { + public: + MqttRequestResponseClient(MqttRequestResponseClientImpl *impl); + virtual ~MqttRequestResponseClient(); + + int submitRequest( + const aws_mqtt_request_operation_options &requestOptions, + UnmodeledResultHandler &&resultHandler); + + std::shared_ptr createStream(StreamingOperationOptions &&options); + + private: + MqttRequestResponseClientImpl *m_impl; + }; + + IMqttRequestResponseClient *IMqttRequestResponseClient::newFrom5( + const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, + RequestResponseClientOptions &&options, + Aws::Crt::Allocator *allocator) + { + auto clientImpl = Aws::Crt::New(allocator, allocator); + + struct aws_mqtt_request_response_client_options rrClientOptions; + AWS_ZERO_STRUCT(rrClientOptions); + rrClientOptions.max_request_response_subscriptions = options.maxRequestResponseSubscriptions; + rrClientOptions.max_streaming_subscriptions = options.maxStreamingSubscriptions; + rrClientOptions.operation_timeout_seconds = options.operationTimeoutInSeconds; + rrClientOptions.terminated_callback = s_onClientTermination; + rrClientOptions.user_data = clientImpl; + + struct aws_mqtt_request_response_client *rrClient = + aws_mqtt_request_response_client_new_from_mqtt5_client( + allocator, protocolClient.GetUnderlyingHandle(), &rrClientOptions); + if (!rrClient) + { + Aws::Crt::Delete(clientImpl, clientImpl->getAllocator()); + return nullptr; + } + + clientImpl->seatClient(rrClient); + + return Aws::Crt::New(allocator, clientImpl); + } + + IMqttRequestResponseClient *IMqttRequestResponseClient::newFrom311( + const Aws::Crt::Mqtt::MqttConnection &protocolClient, + RequestResponseClientOptions &&options, + Aws::Crt::Allocator *allocator) + { + auto clientImpl = Aws::Crt::New(allocator, allocator); + + struct aws_mqtt_request_response_client_options rrClientOptions; + AWS_ZERO_STRUCT(rrClientOptions); + rrClientOptions.max_request_response_subscriptions = options.maxRequestResponseSubscriptions; + rrClientOptions.max_streaming_subscriptions = options.maxStreamingSubscriptions; + rrClientOptions.operation_timeout_seconds = options.operationTimeoutInSeconds; + rrClientOptions.terminated_callback = s_onClientTermination; + rrClientOptions.user_data = clientImpl; + + struct aws_mqtt_request_response_client *rrClient = + aws_mqtt_request_response_client_new_from_mqtt311_client( + allocator, protocolClient.GetUnderlyingConnection(), &rrClientOptions); + if (!rrClient) + { + Aws::Crt::Delete(clientImpl, clientImpl->getAllocator()); + return nullptr; + } + + clientImpl->seatClient(rrClient); + + return Aws::Crt::New(allocator, clientImpl); + } + + int MqttRequestResponseClient::submitRequest( + const aws_mqtt_request_operation_options &requestOptions, + UnmodeledResultHandler &&resultHandler) + { + return m_impl->submitRequest(requestOptions, std::move(resultHandler)); + } + + std::shared_ptr MqttRequestResponseClient::createStream( + StreamingOperationOptions &&options) + { + return m_impl->createStream(std::move(options)); + } + + MqttRequestResponseClient::MqttRequestResponseClient(MqttRequestResponseClientImpl *impl) : m_impl(impl) {} + + MqttRequestResponseClient::~MqttRequestResponseClient() + { + m_impl->close(); + } + } // namespace RequestResponse + } // namespace Iot +} // namespace Aws diff --git a/source/mqtt/Mqtt5Client.cpp b/source/mqtt/Mqtt5Client.cpp index 62bdefddb..4d40eb7f1 100644 --- a/source/mqtt/Mqtt5Client.cpp +++ b/source/mqtt/Mqtt5Client.cpp @@ -174,7 +174,8 @@ namespace Aws return m_operationStatistics; } - struct aws_mqtt5_client *Mqtt5Client::GetUnderlyingHandle() const noexcept { + struct aws_mqtt5_client *Mqtt5Client::GetUnderlyingHandle() const noexcept + { return m_client_core->GetUnderlyingHandle(); } diff --git a/source/mqtt/MqttConnection.cpp b/source/mqtt/MqttConnection.cpp index c65729289..043c3123a 100644 --- a/source/mqtt/MqttConnection.cpp +++ b/source/mqtt/MqttConnection.cpp @@ -194,7 +194,7 @@ namespace Aws return m_connectionCore->Disconnect(); } - aws_mqtt_client_connection *MqttConnection::GetUnderlyingConnection() noexcept + aws_mqtt_client_connection *MqttConnection::GetUnderlyingConnection() const noexcept { AWS_ASSERT(m_connectionCore != nullptr); return m_connectionCore->GetUnderlyingConnection(); diff --git a/source/mqtt/MqttConnectionCore.cpp b/source/mqtt/MqttConnectionCore.cpp index 96b0f46dd..a360dfd04 100644 --- a/source/mqtt/MqttConnectionCore.cpp +++ b/source/mqtt/MqttConnectionCore.cpp @@ -669,7 +669,7 @@ namespace Aws m_underlyingConnection, MqttConnectionCore::s_onDisconnect, this) == AWS_OP_SUCCESS; } - aws_mqtt_client_connection *MqttConnectionCore::GetUnderlyingConnection() noexcept + aws_mqtt_client_connection *MqttConnectionCore::GetUnderlyingConnection() const noexcept { return m_underlyingConnection; }