diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index ba4f1a624a7..18a1257da31 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/p2p/internal" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/proto/p2p" "github.com/prometheus/client_golang/prometheus" @@ -58,7 +59,7 @@ type MessageClientConfig struct { // MessageClient is a client used to send peer messages. // `Run` must be running before sending any message. type MessageClient struct { - sendCh chan *p2p.MessageEntry + sendCh *internal.SendChan topicMu sync.RWMutex topics map[string]*topicEntry @@ -91,7 +92,7 @@ type topicEntry struct { // senderID is an identifier for the local node. func NewMessageClient(senderID NodeID, config *MessageClientConfig) *MessageClient { return &MessageClient{ - sendCh: make(chan *p2p.MessageEntry, config.SendChannelSize), + sendCh: internal.NewSendChan(int64(config.SendChannelSize)), topics: make(map[string]*topicEntry), senderID: senderID, closeCh: make(chan struct{}), @@ -227,48 +228,45 @@ func (c *MessageClient) runTx(ctx context.Context, stream clientStream) error { batchSender := c.newSenderFn(stream) for { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case msg, ok := <-c.sendCh: - if !ok { - // sendCh has been closed - return nil + msg, ok, err := c.sendCh.Receive(ctx, ticker.C) + if err != nil { + return errors.Trace(err) + } + if !ok { + // The implementation of batchSender guarantees that + // an empty flush does not send any message. + if err := batchSender.Flush(); err != nil { + return errors.Trace(err) } + continue + } - c.topicMu.RLock() - tpk, ok := c.topics[msg.Topic] - c.topicMu.RUnlock() - if !ok { - // This line should never be reachable unless there is a bug in this file. - log.Panic("topic not found. Report a bug", zap.String("topic", msg.Topic)) - } + c.topicMu.RLock() + tpk, ok := c.topics[msg.Topic] + c.topicMu.RUnlock() + if !ok { + // This line should never be reachable unless there is a bug in this file. + log.Panic("topic not found. Report a bug", zap.String("topic", msg.Topic)) + } - // We want to assert that `msg.Sequence` is continuous within a topic. - if old := tpk.lastSent.Swap(msg.Sequence); old != initAck && msg.Sequence != old+1 { - log.Panic("unexpected seq of message", - zap.String("topic", msg.Topic), - zap.Int64("seq", msg.Sequence)) - } + // We want to assert that `msg.Sequence` is continuous within a topic. + if old := tpk.lastSent.Swap(msg.Sequence); old != initAck && msg.Sequence != old+1 { + log.Panic("unexpected seq of message", + zap.String("topic", msg.Topic), + zap.Int64("seq", msg.Sequence)) + } - tpk.sentMessageMu.Lock() - tpk.sentMessages.PushBack(msg) - tpk.sentMessageMu.Unlock() + tpk.sentMessageMu.Lock() + tpk.sentMessages.PushBack(msg) + tpk.sentMessageMu.Unlock() - metricsClientMessageCount.Inc() + metricsClientMessageCount.Inc() - log.Debug("Sending Message", - zap.String("topic", msg.Topic), - zap.Int64("seq", msg.Sequence)) - if err := batchSender.Append(msg); err != nil { - return errors.Trace(err) - } - case <-ticker.C: - // The implementation of batchSender guarantees that - // an empty flush does not send any message. - if err := batchSender.Flush(); err != nil { - return errors.Trace(err) - } + log.Debug("Sending Message", + zap.String("topic", msg.Topic), + zap.Int64("seq", msg.Sequence)) + if err := batchSender.Append(msg); err != nil { + return errors.Trace(err) } } } @@ -381,6 +379,7 @@ func (c *MessageClient) runRx(ctx context.Context, stream clientStream) error { // SendMessage sends a message. It will block if the client is not ready to // accept the message for now. Once the function returns without an error, // the client will try its best to send the message, until `Run` is canceled. +// NOTE The function involves busy-waiting, so DO NOT use it when you expect contention or congestion! func (c *MessageClient) SendMessage(ctx context.Context, topic Topic, value interface{}) (seq Seq, ret error) { return c.sendMessage(ctx, topic, value, false) } @@ -415,47 +414,34 @@ func (c *MessageClient) sendMessage(ctx context.Context, topic Topic, value inte tpk = &topicEntry{ sentMessages: deque.NewDeque(), } - // the sequence = 0 is reserved. - tpk.nextSeq.Store(1) + tpk.nextSeq.Store(0) c.topicMu.Lock() - c.topics[topic] = tpk + if newTpk, ok := c.topics[topic]; !ok { + c.topics[topic] = tpk + } else { + tpk = newTpk + } c.topicMu.Unlock() } - nextSeq := tpk.nextSeq.Load() - data, err := marshalMessage(value) if err != nil { return 0, cerrors.WrapError(cerrors.ErrPeerMessageEncodeError, err) } - msg := &p2p.MessageEntry{ - Topic: topic, - Content: data, - Sequence: nextSeq, - } - if nonblocking { - select { - case <-ctx.Done(): - return 0, errors.Trace(ctx.Err()) - case c.sendCh <- msg: - default: + ok, seq := c.sendCh.SendAsync(topic, data, tpk.nextSeq.Inc) + if !ok { return 0, cerrors.ErrPeerMessageSendTryAgain.GenWithStackByArgs() } - } else { - // blocking - select { - case <-ctx.Done(): - return 0, errors.Trace(ctx.Err()) - case <-c.closeCh: - return 0, cerrors.ErrPeerMessageClientClosed.GenWithStackByArgs() - case c.sendCh <- msg: - } + return seq, nil } - - tpk.nextSeq.Add(1) - return nextSeq, nil + // blocking + seq, err = c.sendCh.SendSync(ctx, topic, data, c.closeCh, tpk.nextSeq.Inc) + if err != nil { + return 0, errors.Trace(err) + } + return seq, nil } // CurrentAck returns (s, true) if all messages with sequence less than or diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index 471cf1bf74d..49c4ccd1cd6 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -60,7 +60,7 @@ type testMessage struct { } var clientConfigForUnitTesting = &MessageClientConfig{ - SendChannelSize: 0, // unbuffered channel to make tests more reliable + SendChannelSize: 1, BatchSendInterval: 128 * time.Hour, // essentially disables flushing MaxBatchBytes: math.MaxInt64, MaxBatchCount: math.MaxInt64, @@ -331,6 +331,9 @@ func TestClientSendAnomalies(t *testing.T) { // Test point 1: ErrPeerMessageSendTryAgain _, err := client.TrySendMessage(ctx, "test-topic", &testMessage{Value: 1}) + require.NoError(t, err) + + _, err = client.TrySendMessage(ctx, "test-topic", &testMessage{Value: 1}) require.Error(t, err) require.Regexp(t, ".*ErrPeerMessageSendTryAgain.*", err.Error()) diff --git a/pkg/p2p/internal/send_chan.go b/pkg/p2p/internal/send_chan.go new file mode 100644 index 00000000000..a25dc2096a2 --- /dev/null +++ b/pkg/p2p/internal/send_chan.go @@ -0,0 +1,153 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + cerrors "github.com/pingcap/tiflow/pkg/errors" + proto "github.com/pingcap/tiflow/proto/p2p" + "go.uber.org/zap" +) + +// SendChan is a specialized channel used to implement +// the asynchronous interface of the MessageClient. +// SendChan is a MPSC channel. +type SendChan struct { + mu sync.Mutex + buf []*proto.MessageEntry + sendIdx int64 + recvIdx int64 + + ready chan struct{} + + cap int64 +} + +// NewSendChan returns a new SendChan. +func NewSendChan(cap int64) *SendChan { + return &SendChan{ + buf: make([]*proto.MessageEntry, cap), + ready: make(chan struct{}, 1), + cap: cap, + } +} + +// SendSync sends a message synchronously. +// NOTE SendSync employs busy waiting and is ONLY suitable +// for writing Unit-Tests. +func (c *SendChan) SendSync( + ctx context.Context, + topic string, + value []byte, + closeCh <-chan struct{}, + nextSeq func() int64, +) (int64, error) { + for { + select { + case <-ctx.Done(): + return 0, errors.Trace(ctx.Err()) + case <-closeCh: + return 0, cerrors.ErrPeerMessageClientClosed.GenWithStackByArgs() + default: + } + + if ok, seq := c.SendAsync(topic, value, nextSeq); ok { + return seq, nil + } + // Used to reduce contention when race-detector is enabled + time.Sleep(1 * time.Millisecond) + } +} + +// SendAsync tries to send a message. If the message is accepted, nextSeq will be called +// once, and the returned value will be used as the Sequence number of the message. +func (c *SendChan) SendAsync(topic string, value []byte, nextSeq func() int64) (ok bool, seq int64) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.sendIdx-c.recvIdx > c.cap { + log.Panic("unreachable", + zap.Int64("sendIdx", c.sendIdx), + zap.Int64("recvIndex", c.recvIdx)) + } + + if c.sendIdx-c.recvIdx == c.cap { + return false, 0 + } + + seq = nextSeq() + c.buf[c.sendIdx%c.cap] = &proto.MessageEntry{ + Topic: topic, + Content: value, + Sequence: seq, + } + c.sendIdx++ + + select { + case c.ready <- struct{}{}: + default: + } + + return true, seq +} + +// Receive receives one message from the channel. If there is a tick, the function will return +// (nil, false, nil). +func (c *SendChan) Receive(ctx context.Context, tick <-chan time.Time) (*proto.MessageEntry, bool, error) { + select { + case <-ctx.Done(): + return nil, false, errors.Trace(ctx.Err()) + default: + } + + for { + entry := c.doReceive() + if entry != nil { + return entry, true, nil + } + + select { + case <-ctx.Done(): + return nil, false, errors.Trace(ctx.Err()) + case <-tick: + return nil, false, nil + case <-c.ready: + } + } +} + +func (c *SendChan) doReceive() *proto.MessageEntry { + c.mu.Lock() + defer c.mu.Unlock() + + if c.sendIdx < c.recvIdx { + log.Panic("unreachable", + zap.Int64("sendIdx", c.sendIdx), + zap.Int64("recvIndex", c.recvIdx)) + } + + if c.sendIdx == c.recvIdx { + return nil + } + + var ret *proto.MessageEntry + ret, c.buf[c.recvIdx%c.cap] = c.buf[c.recvIdx%c.cap], nil + c.recvIdx++ + return ret +} diff --git a/pkg/p2p/internal/send_chan_test.go b/pkg/p2p/internal/send_chan_test.go new file mode 100644 index 00000000000..9e3fb4751f3 --- /dev/null +++ b/pkg/p2p/internal/send_chan_test.go @@ -0,0 +1,86 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +const ( + defaultSendChanCap = 8 + numProducers = 16 + numMsgPerProducer = 10000 +) + +func TestSendChanBasics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + seq := atomic.NewInt64(0) + c := NewSendChan(defaultSendChanCap) + + var wg sync.WaitGroup + + // Runs the producers + for i := 0; i < numProducers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + lastSeq := int64(0) + for j := 0; j < numMsgPerProducer; { + ok, seq := c.SendAsync("test-topic", []byte("test-value"), func() int64 { + return seq.Inc() + }) + if !ok { + continue + } + j++ + require.Greater(t, seq, lastSeq) + lastSeq = seq + } + }() + } + + // Runs the consumer + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(time.Millisecond * 10) + + recvCount := 0 + lastSeq := int64(0) + for { + msg, ok, err := c.Receive(ctx, ticker.C) + require.NoError(t, err) + if !ok { + continue + } + recvCount++ + require.Equal(t, lastSeq+1, msg.Sequence) + lastSeq = msg.Sequence + if recvCount == numProducers*numMsgPerProducer { + return + } + } + }() + + wg.Wait() + cancel() +} diff --git a/pkg/p2p/server_client_integration_test.go b/pkg/p2p/server_client_integration_test.go index 4406e715b26..57f768c4c48 100644 --- a/pkg/p2p/server_client_integration_test.go +++ b/pkg/p2p/server_client_integration_test.go @@ -35,7 +35,7 @@ import ( // read only var clientConfig4Testing = &MessageClientConfig{ - SendChannelSize: 128, + SendChannelSize: 1024, BatchSendInterval: time.Millisecond * 1, // to accelerate testing MaxBatchCount: 128, MaxBatchBytes: 8192, @@ -76,7 +76,7 @@ func newServerForIntegrationTesting(t *testing.T, serverID string, configOpts .. return } -func runP2PIntegrationTest(ctx context.Context, t *testing.T, size int, numTopics int) { +func runP2PIntegrationTest(ctx context.Context, t *testing.T, size int, numTopics int, clientConcurrency int) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -107,8 +107,10 @@ func runP2PIntegrationTest(ctx context.Context, t *testing.T, size int, numTopic require.Equal(t, "test-client-1", senderID) require.IsType(t, &testTopicContent{}, i) content := i.(*testTopicContent) - require.Equal(t, content.Index-1, lastIndex) - lastIndex = content.Index + if clientConcurrency == 1 { + require.Equal(t, content.Index-1, lastIndex) + lastIndex = content.Index + } return nil }) @@ -135,9 +137,9 @@ func runP2PIntegrationTest(ctx context.Context, t *testing.T, size int, numTopic }() var wg1 sync.WaitGroup - wg1.Add(numTopics) - for j := 0; j < numTopics; j++ { - topicName := fmt.Sprintf("test-topic-%d", j) + wg1.Add(numTopics * clientConcurrency) + for j := 0; j < numTopics*clientConcurrency; j++ { + topicName := fmt.Sprintf("test-topic-%d", j%numTopics) go func() { defer wg1.Done() var oldSeq Seq @@ -145,8 +147,11 @@ func runP2PIntegrationTest(ctx context.Context, t *testing.T, size int, numTopic content := &testTopicContent{Index: int64(i + 1)} seq, err := client.SendMessage(ctx, topicName, content) require.NoError(t, err) - require.Equal(t, oldSeq+1, seq) - oldSeq = seq + + if clientConcurrency == 1 { + require.Equal(t, oldSeq+1, seq) + oldSeq = seq + } } require.Eventuallyf(t, func() bool { @@ -154,7 +159,7 @@ func runP2PIntegrationTest(ctx context.Context, t *testing.T, size int, numTopic if !ok { return false } - return seq >= Seq(size) + return seq >= Seq(size*clientConcurrency) }, time.Second*40, time.Millisecond*20, "failed to wait for ack") }() } @@ -168,14 +173,14 @@ func TestMessageClientBasic(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout) defer cancel() - runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeLarge, 1) + runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeLarge, 1, 4) } func TestMessageClientBasicMultiTopics(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout) defer cancel() - runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeLarge, 4) + runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeLarge, 4, 16) } func TestMessageClientServerRestart(t *testing.T) { @@ -187,7 +192,7 @@ func TestMessageClientServerRestart(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout) defer cancel() - runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 1) + runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 1, 1) } func TestMessageClientServerRestartMultiTopics(t *testing.T) { @@ -199,7 +204,7 @@ func TestMessageClientServerRestartMultiTopics(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout*4) defer cancel() - runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 4) + runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 4, 1) } func TestMessageClientRestart(t *testing.T) { @@ -211,7 +216,7 @@ func TestMessageClientRestart(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout) defer cancel() - runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeLarge, 1) + runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeLarge, 1, 1) } func TestMessageClientRestartMultiTopics(t *testing.T) { @@ -223,7 +228,7 @@ func TestMessageClientRestartMultiTopics(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout) defer cancel() - runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 4) + runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 4, 1) } func TestMessageClientSenderErrorsMultiTopics(t *testing.T) { @@ -235,7 +240,7 @@ func TestMessageClientSenderErrorsMultiTopics(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout) defer cancel() - runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 4) + runP2PIntegrationTest(ctx, t, defaultMessageBatchSizeSmall, 4, 1) } func TestMessageClientBasicNonblocking(t *testing.T) {