StreamX Middleware

Middleware Type

Stream Recv/Send Middleware

Trigger timing : Called when streaming messages

Type definition

type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error)
type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error)

type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint
type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint

Parameter description :

  • Directly obtain the current stream object
  • Res/req both represent real requests and responses.
  • Behavior before and after calling the Next function:
Middleware type Before calling Next After calling Next
StreamRecvMiddleware - The data is not really collected, just called the stream. Recv () function.

- Res parameter is empty
- Data received or encountered an error

- The res parameter has a real value
StreamSendMiddleware - The data was not actually sent, just called the stream.Send () function

- The req parameter is a real request
- Data transmission completed or encountered an error

- The req parameter is a real request

Examples

Usage scenario : Inject relevant business logic when the stream receives/sends messages.

svr, err := xxx.NewServer(
    //...
    streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
        return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
            // ctx has user's token
            token, ok := metainfo.GetPersistentValue(ctx, "user_token")
            // check token balance
            if !hasBalance(token) {
                return fmt.Errorf("user dont have enough balance: token=%s", token)
            }
            return next(ctx, stream, res)
        }
    }),
)

Inject Middleware

Inject Client Middleware

cli, err := xxx.NewClient(
    "a.b.c",
    streamxclient.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
        return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
           return next(ctx, stream, res)
        }
    }),
    streamxclient.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
        return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
           return next(ctx, stream, req)
        }
    }),
)

Inject Server Middleware

server, err := xxx.NewServer(
    // ....
    streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
        return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
           return next(ctx, stream, res)
        }
    }),
    streamxserver.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
        return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
           return next(ctx, stream, req)
        }
    }),
)

Last modified January 13, 2025 : docs: upper title for streamx (#1198) (ba78f45)