StreamX 基础流编程

选择协议

当前支持:

  • TTHeader Streaming

    • 传输协议:TTHeader
    • IDL 定义语言 与 序列化协议:Thrift
  • gRPC Streaming:~~~~ (计划实现)

    • 传输协议:gRPC
    • IDL 定义语言 与 序列化协议:Protobuf 编码

此处选定的协议只影响从 IDL 生成代码,无论哪种协议,以下用法均一致。

使用方法

生成代码

定义 IDL

Thrift

文件 echo.thrift:

namespace go echo

service TestService {
    Response PingPong(1: Request req) // PingPong 非流式接口
    
    Response Echo (1: Request req) (streaming.mode="bidirectional"),
    Response EchoClient (1: Request req) (streaming.mode="client"),
    Response EchoServer (1: Request req) (streaming.mode="server"),
}

生成代码

为保持与旧流式生成代码的兼容,命令行需加上 -streamx flag。

kitex -streamx -module <go module> -service P.S.M echo.thrift
初始化

创建 Client

// 生成代码目录,streamserver 为 IDL 定义的 service name
import ".../kitex_gen/echo/testservice"
import "github.com/cloudwego/kitex/client/streamxclient"

cli, err := testservice.NewClient(
    "a.b.c",
    streamxclient.WithStreamRecvMiddleware(...),
    streamxclient.WithStreamSendMiddleware(...),
)

创建 Server

import ".../kitex_gen/echo/streamserver"
import "github.com/cloudwego/kitex/server/streamxserver"

svr := streamserver.NewServer(
    new(serviceImpl),
    streamxserver.WithStreamRecvMiddleware(...),
    streamxserver.WithStreamSendMiddleware(...),
)

Client Streaming

使用场景

Client 需要发送多份数据给 Server 端,Server 端可以发送一条消息给 Client:

------------------- [Client Streaming] -------------------
--------------- (stream Req) returns (Res) ---------------
client.Send(req)         === req ==>       server.Recv(req)
                             ...
client.Send(req)         === req ==>       server.Recv(req)

client.CloseSend()       === EOF ==>       server.Recv(EOF)
client.Recv(res)         <== res ===       server.SendAndClose(res)
** OR
client.CloseAndRecv(res) === EOF ==>       server.Recv(EOF)
                         <== res ===       server.SendAndClose(res)

Client 用法

  • [必须]: client 必须调用 CloseAndRecv() 或者 (CloseSend + Recv)方法,告知 Server 不再有新数据发送。
ctx, cs, err := cli.ClientStream(ctx)
for i := 0; i < 3; i++ {
    err = cs.Send(ctx, req)
}
res, err = cs.CloseAndRecv(ctx)

Server 用法

  • [必须]: server 必须在 handler 结束时返回一个 Response,告知 Client 最终结果。

func (si *serviceImpl) ClientStream(
    ctx context.Context, stream streamx.ClientStreamingServer[Request, Response]
) (res *Response, err error) {
    for {
       req, err := stream.Recv(ctx)
       if err == io.EOF {
           res := new(Response)
           return res, nil
       }
       if err != nil {
          return nil, err
       }
    }
}

Server Streaming

使用场景

典型场景:ChatGPT 类型业务

Client 发送一个请求给 Server,Server 发送多个返回给 Client:

------------------- [Server Streaming] -------------------
---------- (Request) returns (stream Response) ----------
client.Send(req)   === req ==>   server.Recv(req)
client.Recv(res)   <== res ===   server.Send(req)
                       ...
client.Recv(res)   <== res ===   server.Send(req)
client.Recv(EOF)   <== EOF ===   server handler return

Client 用法

  • [必须]: client 必须判断 io.EOF 错误,并结束循环
ctx, ss, err := cli.ServerStream(ctx, req)
for {
    res, err := ss.Recv(ctx)
    if errors.Is(err, io.EOF) {
       break
    }
}

Server 用法

func (si *serviceImpl) ServerStream(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error {
    for i := 0; i < 3; i++ {
       err := stream.Send(ctx, resp)
       if err != nil {
          return err
       }
    }
    return nil
}

Bidirectional Streaming

使用场景

Client 与 Server 可能需要或者未来有可能需要发送多条消息:

----------- [Bidirectional Streaming] -----------
--- (stream Request) returns (stream Response) ---
* goroutine 1 *
client.Send(req)   === req ==>   server.Recv(req)
                       ...
client.Send(req)   === req ==>   server.Recv(req)
client.CloseSend() === EOF ==>   server.Recv(EOF)

* goroutine 2 *
client.Recv(res)   <== res ===   server.Send(req)
                       ...
client.Recv(res)   <== res ===   server.Send(req)
client.Recv(EOF)   <== EOF ===   server handler return

Client 用法

  • [必须]: client 必须在发送结束后调用 CloseSend
  • [必须]: client 必须在 Recv 时,判断 io.EOF 并结束循环
ctx, bs, err := cli.BidiStream(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
    defer wg.Done()
    for i := 0; i < round; i++ {
       err := bs.Send(ctx, req)
    }
    err = bs.CloseSend(ctx)
}()
go func() {
    defer wg.Done()
    for {
       res, err := bs.Recv(ctx)
       if errors.Is(err, io.EOF) {
          break
       }
    }
}()
wg.Wait()

Server 用法

  • [必须]: server 必须在 Recv 时,判断 io.EOF 并结束循环
func (si *serviceImpl) BidiStream(ctx context.Context, stream streamx.BidiStreamingServer[Request, Response]) error {
    for {
       req, err := stream.Recv(ctx)
       if err == io.EOF {
          return nil
       }
       if err != nil {
          return err
       }
       err = stream.Send(ctx, resp)
       if err != nil {
          return err
       }
    }
}

最后修改 January 13, 2025 : docs: upper title for streamx (#1198) (ba78f45)