StreamX Middleware
Middleware Type
Stream Recv/Send Middleware
Trigger timing : Called when streaming messages
Type definition
type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error)
type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error)
type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint
type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint
Parameter description :
- Directly obtain the current stream object
- Res/req both represent real requests and responses.
- Behavior before and after calling the Next function:
Middleware type | Before calling Next | After calling Next |
---|---|---|
StreamRecvMiddleware | - The data is not really collected, just called the stream. Recv () function. - Res parameter is empty |
- Data received or encountered an error - The res parameter has a real value |
StreamSendMiddleware | - The data was not actually sent, just called the stream.Send () function - The req parameter is a real request |
- Data transmission completed or encountered an error - The req parameter is a real request |
Examples
Usage scenario : Inject relevant business logic when the stream receives/sends messages.
svr, err := xxx.NewServer(
//...
streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
// ctx has user's token
token, ok := metainfo.GetPersistentValue(ctx, "user_token")
// check token balance
if !hasBalance(token) {
return fmt.Errorf("user dont have enough balance: token=%s", token)
}
return next(ctx, stream, res)
}
}),
)
Inject Middleware
Inject Client Middleware
cli, err := xxx.NewClient(
"a.b.c",
streamxclient.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
return next(ctx, stream, res)
}
}),
streamxclient.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
return next(ctx, stream, req)
}
}),
)
Inject Server Middleware
server, err := xxx.NewServer(
// ....
streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
return next(ctx, stream, res)
}
}),
streamxserver.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
return next(ctx, stream, req)
}
}),
)
Last modified
January 13, 2025
: docs: upper title for streamx (#1198) (ba78f45)