Putting it all together
Putting it all together
use service_async::{
layer::{layer_fn, FactoryLayer}, AsyncMakeService, MakeService, Param, ParamMaybeRef, ParamRef, Service
};
#[derive(Clone)]
pub struct RoutingHandler<H> {
inner: H,
router: Router<RouteConfig>,
}
impl<H, CX, B> Service<(Request<B>, CX)> for RoutingHandler<H>
where
CX: ParamRef<PeerAddr>,
H: HttpHandler<CX, B>,
H::Body: FixedBody,
{
type Response = ResponseWithContinue<H::Body>;
type Error = H::Error;
async fn call(
&self,
(mut request, ctx): (Request<B>, CX),
) -> Result<Self::Response, Self::Error> {
let req_path = request.uri().path();
tracing::info!("request path: {req_path}");
let peer_addr = ParamRef::<PeerAddr>::param_ref(&ctx);
tracing::info!("Peer Addr: {:?}", peer_addr);
match self.router.at(req_path) {
Ok(route) => {
let route = route.value;
tracing::info!("the route id: {}", route.id);
use rand::seq::SliceRandom;
let upstream = route
.upstreams
.choose(&mut rand::thread_rng())
.expect("empty upstream list");
rewrite_request(&mut request, upstream);
self.inner.handle(request, ctx).await
}
Err(e) => {
debug!("match request uri: {} with error: {e}", request.uri());
Ok((generate_response(StatusCode::NOT_FOUND, false), true))
}
}
}
}
pub struct RoutingHandlerFactory<F> {
inner: F,
routes: Vec<RouteConfig>,
}
#[derive(thiserror::Error, Debug)]
pub enum RoutingFactoryError<E> {
#[error("inner error: {0:?}")]
Inner(E),
#[error("empty upstream")]
EmptyUpstream,
#[error("router error: {0:?}")]
Router(#[from] matchit::InsertError),
}
impl<F: MakeService> MakeService for RoutingHandlerFactory<F> {
type Service = RoutingHandler<F::Service>;
type Error = RoutingFactoryError<F::Error>;
fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
let mut router: Router<RouteConfig> = Router::new();
for route in self.routes.iter() {
router.insert(&route.path, route.clone())?;
if route.upstreams.is_empty() {
return Err(RoutingFactoryError::EmptyUpstream);
}
}
Ok(RoutingHandler {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(RoutingFactoryError::Inner)?,
router,
})
}
}
impl<F> RoutingHandler<F> {
pub fn layer<C>() -> impl FactoryLayer<C, F, Factory = RoutingHandlerFactory<F>>
where
C: Param<Vec<RouteConfig>>,
{
layer_fn(|c: &C, inner| {
let routes = c.param();
RoutingHandlerFactory { inner, routes }
})
}
}
fn rewrite_request<B>(request: &mut Request<B>, upstream: &Upstream) {
// URI rewrite logic
}
Last modified
January 13, 2025
: docs: add description for streamx (#1202) (0337c81)