限流
Kitex 默认限流和自定义限流使用指南。
限流是一种保护 server 的措施,防止上游某个 client 流量突增导致 server 端过载。
目前 Kitex 支持用户自定义的 QPS 限流器和连接数限流器,同时提供了默认的实现。
注意事项
- 同时定义
server.WithLimit
和server.WithQPSLimiter
或同时定义server.WithLimit
和server.WithConnectionLimiter
时,只有后者会生效。 - 为节省请求反序列化开销提高性能,在非多路复用场景下,Kitex 默认的 QPS 限流器是在
OnRead
hook 处生效的,而在多路复用或使用自定义 QPS 限流器场景下,限流器在OnMessage
hook 处生效。这是为了保证自定义限流器能获取到请求的基本信息,避免在例如按 method 限流等场景下无法生效的问题。 - 目前的限流功能只对 Thrift、Kitexpb 协议生效,对 gRPC 协议暂不生效。gRPC 可采用流控在传输层面做流量限制,Kitex 提供了
WithGRPCInitialWindowSize
和WithGRPCInitialConnWindowSize
分别用于设置 stream 和连接的流控窗口大小,详见gRPC官方文档
使用默认的限流器
代码示例
import "github.com/cloudwego/kitex/pkg/limit"
func main() {
svr := xxxservice.NewServer(handler, server.WithLimit(&limit.Option{MaxConnections: 10000, MaxQPS: 1000}))
svr.Run()
}
参数说明:
-
MaxConnections
表示最大连接数 -
MaxQPS
表示最大 QPS -
UpdateControl
提供动态修改限流阈值的能力,举例:
import "github.com/cloudwego/kitex/pkg/limit"
// define your limiter updater to update limit threshold
type MyLimiterUpdater struct {
updater limit.Updater
}
func (lu *MyLimiterUpdater) YourChange() {
// your logic: set new option as needed
newOpt := &limit.Option{
MaxConnections: 20000,
MaxQPS: 2000,
}
// update limit config
isUpdated := lu.updater.UpdateLimit(newOpt)
// your logic
}
func (lu *MyLimiterUpdater) UpdateControl(u limit.Updater) {
lu.updater = u
}
//--- init server ---
var lu = MyLimiterUpdater{}
svr := xxxservice.NewServer(handler, server.WithLimit(&limit.Option{MaxConnections: 10000, MaxQPS: 1000, UpdateControl: lu.UpdateControl}))
实现
默认限流器分别使用 ConcurrencyLimiter 和 RateLimiter 对最大连接数和最大 QPS 进行限流。
- ConcurrencyLimiter:简单的计数器;
- RateLimiter:这里的限流算法采用了 " 令牌桶算法 “。
监控
默认限流器定义了 LimitReporter
接口,用于限流状态监控,例如当前连接数过多、QPS 过大等。
如有需求,用户需要自行实现该接口,并通过 WithLimitReporter
注入。
// LimitReporter is the interface define to report(metric or print log) when limit happen
type LimitReporter interface {
ConnOverloadReport()
QPSOverloadReport()
}
使用自定义的限流器
import (
"context"
"time"
"github.com/cloudwego/kitex/pkg/limiter"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
type qpsLimiter struct{}
func (l *qpsLimiter) Acquire(ctx context.Context) bool {
ri := rpcinfo.GetRPCInfo(ctx)
md := ri.From().Method()
return acquire(md) // return true to allow this request
}
func (l *qpsLimiter) Status(ctx context.Context) (max, current int, interval time.Duration) {
// max: the maximum number of requests allowed in the interval;
// current: the remaining number of requests allowed in the interval;
return
}
type connectionLimiter struct{}
func (l *connectionLimiter) Acquire(ctx context.Context) bool {
ri := rpcinfo.GetRPCInfo(ctx)
addr := ri.From().Address()
return acquire(addr) // return true to allow this connection
}
func (l *connectionLimiter) Release(ctx context.Context) {
ri := rpcinfo.GetRPCInfo(ctx)
addr := ri.From().Address()
return release(addr) // release occupied resource by the connection, only called after the release is successful.
}
func (l *connectionLimiter) Status(ctx context.Context) (limit, occupied int) {
// limit: the maximum number of connections allowed.
// occupied: the number of existing connections.
return
}
func main() {
myQPSLimiter := &qpsLimiter{}
myConnectionLimiter := &connectionLimiter{}
svr := xxxservice.NewServer(handler, server.WithQPSLimiter(myQPSLimiter), server.WithConnectionLimiter(myConnectionLimiter))
svr.Run()
}
最后修改
January 13, 2025
: docs: add description for streamx (#1202) (0337c81)