Skip to content

Commit

Permalink
Removed unused receive methods from channel
Browse files Browse the repository at this point in the history
  • Loading branch information
andrebires committed Mar 30, 2022
1 parent fcbcd0b commit 9bf27b7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 160 deletions.
70 changes: 0 additions & 70 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ type Sender interface {
ResponseCommandSender
}

// ChannelModule defines a proxy interface for executing actions to the envelope channels.
type ChannelModule interface {
StateChanged(ctx context.Context, state SessionState) // StateChanged is called when the session state is changed.
Receiving(ctx context.Context, env envelope) envelope // Receiving is called when an envelope is being received by the channel.
Sending(ctx context.Context, env envelope) envelope // Sending is called when an envelope is being sent by the channel.
}

type channel struct {
transport Transport
sessionID string
Expand Down Expand Up @@ -317,81 +310,18 @@ func (c *channel) SendMessage(ctx context.Context, msg *Message) error {
return c.sendToTransport(ctx, msg, "send message")
}

func (c *channel) ReceiveMessage(ctx context.Context) (*Message, error) {
if err := c.ensureEstablished("receive message"); err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, fmt.Errorf("receive message: %w", ctx.Err())
case msg, ok := <-c.inMsgChan:
if !ok {
return nil, errors.New("receive message: channel closed")
}
return msg, nil
}
}
func (c *channel) SendNotification(ctx context.Context, not *Notification) error {
return c.sendToTransport(ctx, not, "send notification")
}

func (c *channel) ReceiveNotification(ctx context.Context) (*Notification, error) {
if err := c.ensureEstablished("receive notification"); err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, fmt.Errorf("receive notification: %w", ctx.Err())
case not, ok := <-c.inNotChan:
if !ok {
return nil, errors.New("receive notification: channel closed")
}
return not, nil
}
}

func (c *channel) SendRequestCommand(ctx context.Context, cmd *RequestCommand) error {
return c.sendToTransport(ctx, cmd, "send request command")
}

func (c *channel) ReceiveRequestCommand(ctx context.Context) (*RequestCommand, error) {
if err := c.ensureEstablished("receive request command"); err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, fmt.Errorf("receive request command: %w", ctx.Err())
case cmd, ok := <-c.inReqCmdChan:
if !ok {
return nil, errors.New("receive request command: channel closed")
}
return cmd, nil
}
}

func (c *channel) SendResponseCommand(ctx context.Context, cmd *ResponseCommand) error {
return c.sendToTransport(ctx, cmd, "send response command")
}

func (c *channel) ReceiveResponseCommand(ctx context.Context) (*ResponseCommand, error) {
if err := c.ensureEstablished("receive response command"); err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, fmt.Errorf("receive response command: %w", ctx.Err())
case cmd, ok := <-c.inRespCmdChan:
if !ok {
return nil, errors.New("receive response command: channel closed")
}
return cmd, nil
}
}

func (c *channel) ProcessCommand(ctx context.Context, reqCmd *RequestCommand) (*ResponseCommand, error) {
return c.processCommand(ctx, c, reqCmd)
}
Expand Down
150 changes: 60 additions & 90 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,30 +213,14 @@ func TestChannel_ReceiveMessage_WhenEstablished(t *testing.T) {
_ = server.Send(ctx, m)

// Act
actual, err := c.ReceiveMessage(ctx)

// Assert
assert.NoError(t, err)
assert.Equal(t, m, actual)
}

func TestChannel_ReceiveMessage_WhenContextDeadline(t *testing.T) {
// Arrange
defer goleak.VerifyNone(t)
client, _ := newInProcessTransportPair("localhost", 1)
c := newChannel(client, 1)
defer silentClose(c)
c.setState(SessionStateEstablished)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()

// Act
actual, err := c.ReceiveMessage(ctx)

// Assert
assert.Error(t, err)
assert.Equal(t, "receive message: context deadline exceeded", err.Error())
assert.Nil(t, actual)
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case actual, ok := <-c.MsgChan():
// Assert
assert.True(t, ok)
assert.Equal(t, m, actual)
}
}

func TestChannel_ReceiveMessage_WhenFinishedState(t *testing.T) {
Expand All @@ -262,12 +246,16 @@ func receiveMessageWithState(t *testing.T, state SessionState) {
time.Sleep(50 * time.Millisecond)
c.setState(state)
}()
actual, err := c.ReceiveMessage(ctx)

// Assert
assert.Error(t, err)
assert.Equal(t, "receive message: channel closed", err.Error())
assert.Nil(t, actual)
// Act
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case actual, ok := <-c.MsgChan():
// Assert
assert.False(t, ok)
assert.Nil(t, actual)
}
}

func TestChannel_SendNotification_WhenEstablished(t *testing.T) {
Expand Down Expand Up @@ -429,30 +417,14 @@ func TestChannel_ReceiveNotification_WhenEstablished(t *testing.T) {
_ = server.Send(ctx, n)

// Act
actual, err := c.ReceiveNotification(ctx)

// Assert
assert.NoError(t, err)
assert.Equal(t, n, actual)
}

func TestChannel_ReceiveNotification_WhenContextCanceled(t *testing.T) {
// Arrange
defer goleak.VerifyNone(t)
client, _ := newInProcessTransportPair("localhost", 1)
c := newChannel(client, 1)
defer silentClose(c)
c.setState(SessionStateEstablished)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()

// Act
actual, err := c.ReceiveNotification(ctx)

// Assert
assert.Error(t, err)
assert.Equal(t, "receive notification: context deadline exceeded", err.Error())
assert.Nil(t, actual)
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case actual, ok := <-c.NotChan():
// Assert
assert.True(t, ok)
assert.Equal(t, n, actual)
}
}

func TestChannel_ReceiveNotification_WhenFinishedState(t *testing.T) {
Expand All @@ -479,12 +451,16 @@ func receiveNotificationWithState(t *testing.T, state SessionState) {
time.Sleep(50 * time.Millisecond)
c.setState(state)
}()
actual, err := c.ReceiveNotification(ctx)

// Assert
assert.Error(t, err)
assert.Equal(t, "receive notification: channel closed", err.Error())
assert.Nil(t, actual)
// Act
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case actual, ok := <-c.NotChan():
// Assert
assert.False(t, ok)
assert.Nil(t, actual)
}
}

func TestChannel_SendRequestCommand_WhenEstablished(t *testing.T) {
Expand Down Expand Up @@ -648,30 +624,14 @@ func TestChannel_ReceiveCommand_WhenEstablished(t *testing.T) {
_ = server.Send(ctx, cmd)

// Act
actual, err := c.ReceiveRequestCommand(ctx)

// Assert
assert.NoError(t, err)
assert.Equal(t, cmd, actual)
}

func TestChannel_ReceiveCommand_WhenContextCanceled(t *testing.T) {
// Arrange
defer goleak.VerifyNone(t)
client, _ := newInProcessTransportPair("localhost", 1)
c := newChannel(client, 1)
defer silentClose(c)
c.setState(SessionStateEstablished)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()

// Act
actual, err := c.ReceiveRequestCommand(ctx)

// Assert
assert.Error(t, err)
assert.Equal(t, "receive request command: context deadline exceeded", err.Error())
assert.Nil(t, actual)
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case actual, ok := <-c.ReqCmdChan():
// Assert
assert.True(t, ok)
assert.Equal(t, cmd, actual)
}
}

func TestChannel_ReceiveCommand_WhenFinishedState(t *testing.T) {
Expand All @@ -698,12 +658,16 @@ func receiveCommandWithState(t *testing.T, state SessionState) {
time.Sleep(50 * time.Millisecond)
c.setState(state)
}()
actual, err := c.ReceiveRequestCommand(ctx)

// Assert
assert.Error(t, err)
assert.Equal(t, "receive request command: channel closed", err.Error())
assert.Nil(t, actual)
// Act
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case actual, ok := <-c.ReqCmdChan():
// Assert
assert.False(t, ok)
assert.Nil(t, actual)
}
}

func TestChannel_ProcessCommand(t *testing.T) {
Expand Down Expand Up @@ -787,7 +751,13 @@ func TestChannel_ProcessCommand_ResponseWithAnotherId(t *testing.T) {
assert.Nil(t, actual)
ctx, cancel = context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
actualRespCmd, err := c.ReceiveResponseCommand(ctx)
assert.NoError(t, err)
assert.Equal(t, respCmd, actualRespCmd)

// Act
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case actualRespCmd, ok := <-c.RespCmdChan():
assert.True(t, ok)
assert.Equal(t, respCmd, actualRespCmd)
}
}

0 comments on commit 9bf27b7

Please sign in to comment.