Skip to content

Commit

Permalink
Unify subscription id with client message id
Browse files Browse the repository at this point in the history
From now on, we use only 1 id in request/response
messages. This id is called `subscription_id`.

A client may provide `subscription_id` in `subscribe` request.
If client does not provide it, we generate it ourselves.

Clients that use browsers or other async environemnts
may use `subscription_id` to correlate response messages
with the request ones.

`subscription_id` is used in all messages related to subscription.

I also remove `success` field from response. We include `subscription_id`
field in a resposne in case of OK response.
In case of error response, we include `error` field.
  • Loading branch information
illia-malachyn committed Jan 3, 2025
1 parent c8847ef commit 8d1e427
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 167 deletions.
76 changes: 50 additions & 26 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ func (c *Controller) readMessages(ctx context.Context) error {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidMessage, "error reading message", "", "", ""))
wrapErrorMessage(InvalidMessage, "error reading message", ""),
)
continue
}

Expand All @@ -299,7 +300,8 @@ func (c *Controller) readMessages(ctx context.Context) error {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidMessage, "error parsing message", "", "", ""))
wrapErrorMessage(InvalidMessage, "error parsing message", ""),
)
continue
}
}
Expand Down Expand Up @@ -342,24 +344,32 @@ func (c *Controller) handleMessage(ctx context.Context, message json.RawMessage)
}

func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMessageRequest) {
subscriptionID, err := c.parseOrCreateSubscriptionID(msg.SubscriptionID)
if err != nil {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error parsing subscription id", msg.SubscriptionID),
)
return
}

// register new provider
provider, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.multiplexedStream)
if err != nil {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error creating data provider", msg.ClientMessageID, models.SubscribeAction, ""),
wrapErrorMessage(InvalidArgument, "error creating data provider", subscriptionID.String()),
)
return
}
c.dataProviders.Add(provider.ID(), provider)
c.dataProviders.Add(subscriptionID, provider)

// write OK response to client
responseOk := models.SubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
ClientMessageID: msg.ClientMessageID,
Success: true,
SubscriptionID: provider.ID().String(),
SubscriptionID: subscriptionID.String(),
},
}
c.writeResponse(ctx, responseOk)
Expand All @@ -372,44 +382,42 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(SubscriptionError, "subscription finished with error", "", "", ""),
wrapErrorMessage(InternalError, "internal error", subscriptionID.String()),
)
}

c.dataProvidersGroup.Done()
c.dataProviders.Remove(provider.ID())
c.dataProviders.Remove(subscriptionID)
}()
}

func (c *Controller) handleUnsubscribe(ctx context.Context, msg models.UnsubscribeMessageRequest) {
id, err := uuid.Parse(msg.SubscriptionID)
subscriptionID, err := uuid.Parse(msg.SubscriptionID)
if err != nil {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error parsing subscription ID", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID),
wrapErrorMessage(InvalidArgument, "error parsing subscription id", msg.SubscriptionID),
)
return
}

provider, ok := c.dataProviders.Get(id)
provider, ok := c.dataProviders.Get(subscriptionID)
if !ok {
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(NotFound, "subscription not found", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID),
wrapErrorMessage(NotFound, "subscription not found", subscriptionID.String()),
)
return
}

provider.Close()
c.dataProviders.Remove(id)
c.dataProviders.Remove(subscriptionID)

responseOk := models.UnsubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
ClientMessageID: msg.ClientMessageID,
Success: true,
SubscriptionID: msg.SubscriptionID,
SubscriptionID: subscriptionID.String(),
},
}
c.writeResponse(ctx, responseOk)
Expand All @@ -429,15 +437,16 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(NotFound, "error listing subscriptions", msg.ClientMessageID, models.ListSubscriptionsAction, ""),
wrapErrorMessage(NotFound, "error listing subscriptions", ""),
)
return
}

responseOk := models.ListSubscriptionsMessageResponse{
Success: true,
ClientMessageID: msg.ClientMessageID,
Subscriptions: subs,
BaseMessageResponse: models.BaseMessageResponse{
SubscriptionID: msg.SubscriptionID,
},
Subscriptions: subs,
}
c.writeResponse(ctx, responseOk)
}
Expand Down Expand Up @@ -474,15 +483,30 @@ func (c *Controller) writeResponse(ctx context.Context, response interface{}) {
}
}

func wrapErrorMessage(code Code, message string, msgId string, action string, subscriptionID string) models.BaseMessageResponse {
func wrapErrorMessage(code Code, message string, subscriptionID string) models.BaseMessageResponse {
return models.BaseMessageResponse{
ClientMessageID: msgId,
Success: false,
SubscriptionID: subscriptionID,
SubscriptionID: subscriptionID,
Error: models.ErrorMessage{
Code: int(code),
Message: message,
Action: action,
},
}
}

func (c *Controller) parseOrCreateSubscriptionID(id string) (uuid.UUID, error) {
// if client didn't provide subscription id, we create one for him
if id == "" {
return uuid.New(), nil
}

newID, err := uuid.Parse(id)
if err != nil {
return uuid.Nil, err
}

if c.dataProviders.Has(newID) {
return uuid.Nil, fmt.Errorf("subscription id is already in use")
}

return newID, nil
}
Loading

0 comments on commit 8d1e427

Please sign in to comment.