Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

客户端和服务端添加接收方限流 #348

Open
someview opened this issue Jul 18, 2024 · 11 comments
Open

客户端和服务端添加接收方限流 #348

someview opened this issue Jul 18, 2024 · 11 comments

Comments

@someview
Copy link

someview commented Jul 18, 2024

tcp框架层次的限流 ->协程池->应用层次协议实现的限流. netpoll 等同于tcp框架 + 协程池 + other feature.
我们在基于netpoll来开发rpc框架的过程中, 压测的时候经常发现内存炸了. 原因在于,服务端或者客户端应用层处理不过来,然而这里有本身又缺乏一种背压机制.netpoll 一旦接收到新的数据,用空的协程或者放在tasklist里面等待执行,这相当于一种无穷大的机制.

然而, 底层的变更不能依赖应用层的变化,所以背压机制在这里显得不恰当.
更好的描述是, 层级式的需求,

  1. 应用, 应用特征
  2. 需要多高的并发,最多多高的并发,
  3. 对tcp框架,需要pps, 带宽是多少,最多允许每个连接上的任务占用多大内存.

按照这种思路,我认为netpoll应该添加限流功能, 以便能够控制,观察netpoll自身级别的资源使用状况.

@joway
Copy link
Member

joway commented Jul 18, 2024

@someview #298 我猜测你应该要的是这样的解法?

@joway
Copy link
Member

joway commented Jul 18, 2024

根本上,限流是有损的。但是你真正想要的只是内存别占用太猛,并不需要真的拒绝那些连接本身。

@someview
Copy link
Author

someview commented Jul 18, 2024

根本上,限流是有损的。但是你真正想要的只是内存别占用太猛,并不需要真的拒绝那些连接本身。

宏观上的资源控制, 当然,连接本身也可以作为一种资源。就像http2maxStreamNum一样,为了保证资源不被耗尽.
在单向无反馈机制的设计中,对每一层自己的管理的资源应该是可以控制和观察的.

@joway
Copy link
Member

joway commented Jul 19, 2024

@someview 现在就是可以做的呀,你 onConnect onDicConnect 接口可以控制连接数

@someview
Copy link
Author

@someview 现在就是可以做的呀,你 onConnect onDicConnect 接口可以控制连接数

不仅仅是这方面,目前我们切了一个分支,做了三个改动,来做这件事情:

  1. 服务端new的时候,添加了一个参数, 来控制节点参数分配的节点大小:
// WithNoCopyPageSize sets the connection read buffer nocopy page size.
//
//		It means when read trigger,the size of data that can be read at once and the read buffer size of malloc each connection.
//	 This is typically a common power of two.
//
// Default is 8k and must between LinkBufferCap and 8k.
func WithNoCopyPageSize(size int) Option {
	return Option{func(op *options) {
		if size <= LinkBufferCap {
			size = LinkBufferCap
		}
		if size >= pagesize {
			size = pagesize
		}
		op.pagesize = size
	}}
}

// Option .
type Option struct {
	f func(*options)
}
  1. connection的接口添加
    ···
    type FlushCounter interface {
    // FlushAndCount Flush的同时返回Flush长度
    FlushAndCount() (int, error)
    }
    ···
    以便能够实现连接级别的内存和流量控制.

  2. 采用read-throttle分支的限流,来实现服务端限流

@joway
Copy link
Member

joway commented Jul 29, 2024

@someview 第二点我还是建议通过 malloc 的方式来维护长度。因为flush返回的长度一定等于malloc mallocack后真正用户写入的长度。这个因为是用户主动写入的,用户肯定知道。

第一点,现在都开放了内部参数调优了https://github.com/cloudwego/netpoll/pull/349/files

@someview
Copy link
Author

@someview 第二点我还是建议通过 malloc 的方式来维护长度。因为flush返回的长度一定等于malloc mallocack后真正用户写入的长度。这个因为是用户主动写入的,用户肯定知道。

第一点,现在都开放了内部参数调优了https://github.com/cloudwego/netpoll/pull/349/files

如果做写入的流控的话,就需要在shardqueue flush之前,获取到to flush的长度. flush完毕以后,恢复缓冲区大小

@joway
Copy link
Member

joway commented Aug 12, 2024

@someview 写入和读不同,读是被动行为;写入是主动行为,写入的流控应该在应用层调用 Write/Malloc 的时候控制,否则即便netpoll不写入,它也还是在内存里,还是有问题的。

@someview
Copy link
Author

我们对netpoll有几个改动的需求,是否可以考虑添加这个,让linkedbuffer变得可以复用:


// SliceIntoReader implements Connection.
func (c *connection) SliceIntoReader(n int, r Reader) (err error) {
	if err = c.waitRead(n); err != nil {
		return err
	}
	return c.inputBuffer.SliceIntoReader(n, r)
}

func (b *LinkBuffer) SliceIntoReader(n int, r Reader) (err error) {
	var p, ok = r.(*LinkBuffer)
	if !ok {
		return errors.New("unsupported writer which is not LinkBuffer")
	}

	if n <= 0 {
		return nil
	}
	// check whether enough or not.
	if b.Len() < n {
		return fmt.Errorf("link buffer readv[%d] not enough", n)
	}
	b.recalLen(-n) // re-cal length

	if !p.IsEmpty() {
		return fmt.Errorf("dst buffer is not empty")
	}
	p.length = int64(n)

	defer func() {
		// set to read-only
		p.flush = p.flush.next
		p.write = p.flush
	}()

	// single node
	if b.isSingleNode(n) {
		node := b.read.Refer(n)
		p.head, p.read, p.flush = node, node, node
		return nil
	}
	// multiple nodes
	var l = b.read.Len()
	node := b.read.Refer(l)
	b.read = b.read.next

	p.head, p.read, p.flush = node, node, node
	for ack := n - l; ack > 0; ack = ack - l {
		l = b.read.Len()
		if l >= ack {
			p.flush.next = b.read.Refer(ack)
			p.flush = p.flush.next
			break
		} else if l > 0 {
			p.flush.next = b.read.Refer(l)
			p.flush = p.flush.next
		}
		b.read = b.read.next
	}
	return b.Release()
}

// Reuse 将已关闭或者是Append到其他LinkBuffer后的LinkBuffer重新启用
func (b *LinkBuffer) Reuse(size ...int) {
	if b.enable {
		return
	}
	b.Initialize(size...)
	b.enable = true
}

// Initialize 初始化LinkBuffer
func (b *LinkBuffer) Initialize(size ...int) {
	var l int
	if len(size) > 0 {
		l = size[0]
	}
	var node = newLinkBufferNode(l)
	b.head, b.read, b.flush, b.write = node, node, node, node
}

var linkBufferPool = pool.NewSyncPool[*LinkBuffer](func() any {
	return NewLinkBuffer()
})

func NewSizedLinkBuffer(size int) *LinkBuffer {
	lb := linkBufferPool.Get()
	lb.Reuse(size)
	return lb
}

// Recycle 回收AsyncLinkBuffer
func (b *LinkBuffer) Recycle() {
	b.Release()
	linkBufferPool.Put(b)
}

由于存在大量的频繁小包,无论是写入端还是读端,都会产生大量的linkedbuffer,所以需要框架层面linedbuffer本身是可以recycle的. 但是,目前linkedbuffer 对外暴露的方法不足以做到这件事情.

@someview
Copy link
Author

@someview 写入和读不同,读是被动行为;写入是主动行为,写入的流控应该在应用层调用 Write/Malloc 的时候控制,否则即便netpoll不写入,它也还是在内存里,还是有问题的。

写入的行为确实是写入方自己限流的

@joway
Copy link
Member

joway commented Aug 21, 2024

@someview 你如果是长连接但是有很多小包,是不会有大量linkbuffer的,因为一个连接readerbuffer是一个linkbuffer对象。不需要每个包来搞一个linkbuffer。

如果你是短连接,并且每个连接都是小包,那不只这个对象多,connection 对象也会很多。你是哪种情况?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants