StreamX Basic Programming
Select Protocol
Current support:
TTHeader Streaming
- Transport protocol: TTHeader
- IDL Definition Language and Serialization Protocol: Thrift
gRPC Streaming
- Transport protocol: gRPC
- IDL Definition Language and Serialization Protocol: Thrift / Protobuf
The protocol selected here only affects code generated from IDL. Regardless of the protocol, the following usage is consistent.
Usage
Generate code
Define IDL
Thrift
File echo.thrift
:
namespace go echo
service TestService {
Response PingPong(1: Request req) // PingPong normal method
Response Echo (1: Request req) (streaming.mode="bidirectional"),
Response EchoClient (1: Request req) (streaming.mode="client"),
Response EchoServer (1: Request req) (streaming.mode="server"),
}
Generate code
Please make sure that Kitex Tool has been upgraded to v0.13.0+:
go install github.com/cloudwego/kitex/tool/cmd/kitex@latest
To maintain compatibility with legacy stream-generated code, Command Line needs to add the -streamx
flag.
kitex -streamx -module <go module> -service service echo.thrift
Initialization
Create Client
import ".../kitex_gen/echo/testservice"
import "github.com/cloudwego/kitex/client"
cli, err := testservice.NewClient(
"a.b.c",
client.WithStreamRecvMiddleware(...),
client.WithStreamSendMiddleware(...),
)
Create Server
import ".../kitex_gen/echo/testservice"
import "github.com/cloudwego/kitex/server"
svr := testservice.NewServer(
new(serviceImpl),
server.WithStreamRecvMiddleware(...),
server.WithStreamSendMiddleware(...),
)
Client Streaming
Usage scenarios
The client needs to send multiple copies of data to the server, and the server can send a message to the client.
------------------- [Client Streaming] -------------------
--------------- (stream Req) returns (Res) ---------------
client.Send(req) === req ==> server.Recv(req)
...
client.Send(req) === req ==> server.Recv(req)
client.CloseSend() === EOF ==> server.Recv(EOF)
client.Recv(res) <== res === server.SendAndClose(res)
** OR
client.CloseAndRecv(res) === EOF ==> server.Recv(EOF)
<== res === server.SendAndClose(res)
Client Usage
- [Must] : The client must call the CloseAndRecv() or (CloseSend + Recv) method to inform the server that there is no new data to send.
stream, err := cli.EchoClient(ctx)
for i := 0; i < 3; i++ {
err = stream.Send(stream.Context(), req)
}
res, err = stream.CloseAndRecv(stream.Context())
Server usage
- [Must] : The server must return a Response at the end of the handler, informing the client of the final result.
func (si *serviceImpl) EchoClient(
ctx context.Context, stream echo.TestService_EchoClientServer
) (err error) {
for {
req, err := stream.Recv(ctx)
if err == io.EOF {
res := new(Response)
return stream.SendAndClose(ctx, res)
}
if err != nil {
return nil, err
}
}
}
Server Streaming
Usage scenarios
Typical scenario: ChatGPT type business
Client sends a request to Server, Server sends multiple returns to Client.
------------------- [Server Streaming] -------------------
---------- (Request) returns (stream Response) ----------
client.Send(req) === req ==> server.Recv(req)
client.Recv(res) <== res === server.Send(req)
...
client.Recv(res) <== res === server.Send(req)
client.Recv(EOF) <== EOF === server handler return
Client Usage
- [Must] : The client must check the io.EOF error and end the loop
stream, err := cli.EchoServer(ctx, req)
for {
res, err := stream.Recv(stream.Context())
if errors.Is(err, io.EOF) {
break
}
}
Server usage
func (si *serviceImpl) EchoServer(ctx context.Context, req *echo.Request, stream echo.TestService_EchoServerServer) error {
for i := 0; i < 3; i++ {
err := stream.Send(ctx, resp)
if err != nil {
return err
}
}
return nil
}
Bidirectional Streaming
Usage scenarios
Clients and servers may need to or may need to send multiple messages in the future.
----------- [Bidirectional Streaming] -----------
--- (stream Request) returns (stream Response) ---
* goroutine 1 *
client.Send(req) === req ==> server.Recv(req)
...
client.Send(req) === req ==> server.Recv(req)
client.CloseSend() === EOF ==> server.Recv(EOF)
* goroutine 2 *
client.Recv(res) <== res === server.Send(req)
...
client.Recv(res) <== res === server.Send(req)
client.Recv(EOF) <== EOF === server handler return
Client Usage
- [Must] : client must call CloseSend after sending
- [Must] : client must judge io.EOF and end the loop when Recv
stream, err := cli.EchoBidi(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < round; i++ {
err := stream.Send(stream.Context(), req)
}
err = stream.CloseSend(stream.Context())
}()
go func() {
defer wg.Done()
for {
res, err := stream.Recv(stream.Context())
if errors.Is(err, io.EOF) {
break
}
}
}()
wg.Wait()
Server usage
- [Must] : The server must determine io. EOF and end the loop when Recv
func (si *serviceImpl) EchoBidi(ctx context.Context, stream echo.TestService_EchoBidiServer) error {
for {
req, err := stream.Recv(ctx)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
err = stream.Send(ctx, resp)
if err != nil {
return err
}
}
}
Last modified
April 14, 2025
: docs: update hertz graceful shutdown (#1302) (e89fd8c)