gotham/pipeline/
mod.rs

1//! Defines types for a middleware pipeline
2
3mod chain;
4pub use chain::PipelineHandleChain;
5
6mod set;
7pub use set::{finalize_pipeline_set, new_pipeline_set, EditablePipelineSet, PipelineSet};
8
9mod single;
10pub use single::{single_pipeline, SinglePipelineChain, SinglePipelineHandle, SinglePipelineSet};
11
12use log::trace;
13use std::pin::Pin;
14
15use crate::handler::HandlerFuture;
16use crate::middleware::chain::{MiddlewareChain, NewMiddlewareChain};
17use crate::middleware::NewMiddleware;
18use crate::state::{request_id, State};
19
20/// When using middleware, one or more `Middleware` are combined to form a `Pipeline`.
21/// `Middleware` are invoked strictly in the order they're added to the `Pipeline`.
22///
23/// At `Request` dispatch time, the `Middleware` are created from the `NewMiddleware` values given
24/// to the `PipelineBuilder`, and combined with a `Handler` created from the `NewHandler` provided
25/// to `Pipeline::call`.  These `Middleware` and `Handler` values are used for a single `Request`.
26///
27/// # Examples
28///
29/// ```rust
30/// # #[macro_use]
31/// # extern crate gotham_derive;
32/// #
33/// # use std::pin::Pin;
34/// #
35/// # use gotham::helpers::http::response::create_response;
36/// # use gotham::state::State;
37/// # use gotham::handler::HandlerFuture;
38/// # use gotham::middleware::Middleware;
39/// # use gotham::pipeline::*;
40/// # use gotham::router::builder::*;
41/// # use gotham::test::TestServer;
42/// # use hyper::{Body, Response, StatusCode};
43/// #
44/// #[derive(StateData)]
45/// struct MiddlewareData {
46///     vec: Vec<i32>,
47/// }
48///
49/// #[derive(NewMiddleware, Copy, Clone)]
50/// struct MiddlewareOne;
51///
52/// impl Middleware for MiddlewareOne {
53///     // Implementation elided.
54///     // Appends `1` to `MiddlewareData.vec`
55/// #     fn call<Chain>(self, mut state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
56/// #         where Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static
57/// #     {
58/// #         state.put(MiddlewareData { vec: vec![1] });
59/// #         chain(state)
60/// #     }
61/// }
62///
63/// #[derive(NewMiddleware, Copy, Clone)]
64/// struct MiddlewareTwo;
65///
66/// impl Middleware for MiddlewareTwo {
67///     // Implementation elided.
68///     // Appends `2` to `MiddlewareData.vec`
69/// #     fn call<Chain>(self, mut state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
70/// #         where Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static
71/// #     {
72/// #         state.borrow_mut::<MiddlewareData>().vec.push(2);
73/// #         chain(state)
74/// #     }
75/// }
76///
77/// #[derive(NewMiddleware, Copy, Clone)]
78/// struct MiddlewareThree;
79///
80/// impl Middleware for MiddlewareThree {
81///     // Implementation elided.
82///     // Appends `3` to `MiddlewareData.vec`
83/// #     fn call<Chain>(self, mut state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
84/// #         where Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static
85/// #     {
86/// #         state.borrow_mut::<MiddlewareData>().vec.push(3);
87/// #         chain(state)
88/// #     }
89/// }
90///
91/// fn handler(state: State) -> (State, Response<Body>) {
92///     let body = {
93///         let data = state.borrow::<MiddlewareData>();
94///         format!("{:?}", data.vec)
95///     };
96///
97///     let res = create_response(&state, StatusCode::OK, mime::TEXT_PLAIN, body);
98///
99///     (state, res)
100/// }
101///
102/// fn main() {
103///     let (chain, pipelines) = single_pipeline(
104///         new_pipeline()
105///             .add(MiddlewareOne)
106///             .add(MiddlewareTwo)
107///             .add(MiddlewareThree)
108///             .build(),
109///     );
110///
111///     let router = build_router(chain, pipelines, |route| {
112///         route.get("/").to(handler);
113///     });
114///
115///     let test_server = TestServer::new(router).unwrap();
116///     let response = test_server
117///         .client()
118///         .get("http://example.com/")
119///         .perform()
120///         .unwrap();
121///     assert_eq!(response.status(), StatusCode::OK);
122///     assert_eq!(response.read_utf8_body().unwrap(), "[1, 2, 3]");
123/// }
124/// ```
125pub struct Pipeline<T>
126where
127    T: NewMiddlewareChain,
128{
129    chain: T,
130}
131
132/// Represents an instance of a `Pipeline`. Returned from `Pipeline::construct()`.
133struct PipelineInstance<T>
134where
135    T: MiddlewareChain,
136{
137    chain: T,
138}
139
140impl<T> Pipeline<T>
141where
142    T: NewMiddlewareChain,
143{
144    /// Constructs an instance of this `Pipeline` by creating all `Middleware` instances required
145    /// to serve a request. If any middleware fails creation, its error will be returned.
146    fn construct(&self) -> anyhow::Result<PipelineInstance<T::Instance>> {
147        Ok(PipelineInstance {
148            chain: self.chain.construct()?,
149        })
150    }
151}
152
153impl<T> PipelineInstance<T>
154where
155    T: MiddlewareChain,
156{
157    /// Serves a request using this `PipelineInstance`. Requests that pass through all `Middleware`
158    /// will be served with the `f` function.
159    fn call<F>(self, state: State, f: F) -> Pin<Box<HandlerFuture>>
160    where
161        F: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static,
162    {
163        trace!("[{}] calling middleware", request_id(&state));
164        self.chain.call(state, f)
165    }
166}
167
168/// Begins defining a new pipeline.
169///
170/// See `PipelineBuilder` for information on using `new_pipeline()`.
171pub fn new_pipeline() -> PipelineBuilder<()> {
172    trace!(" starting pipeline construction");
173    // See: `impl NewMiddlewareChain for ()`
174    PipelineBuilder { t: () }
175}
176
177/// Constructs a pipeline from a single middleware.
178pub fn single_middleware<M>(m: M) -> Pipeline<(M, ())>
179where
180    M: NewMiddleware,
181    M::Instance: Send + 'static,
182{
183    new_pipeline().add(m).build()
184}
185
186/// Allows a pipeline to be defined by adding `NewMiddleware` values, and building a `Pipeline`.
187///
188/// # Examples
189///
190/// ```rust
191/// # extern crate gotham;
192/// # #[macro_use]
193/// # extern crate gotham_derive;
194/// #
195/// # use std::pin::Pin;
196/// #
197/// # use gotham::state::State;
198/// # use gotham::handler::HandlerFuture;
199/// # use gotham::middleware::Middleware;
200/// # use gotham::pipeline::new_pipeline;
201/// #
202/// # #[derive(NewMiddleware, Copy, Clone)]
203/// # struct MiddlewareOne;
204/// #
205/// # #[derive(NewMiddleware, Copy, Clone)]
206/// # struct MiddlewareTwo;
207/// #
208/// # #[derive(NewMiddleware, Copy, Clone)]
209/// # struct MiddlewareThree;
210/// #
211/// # impl Middleware for MiddlewareOne {
212/// #   fn call<Chain>(self, state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
213/// #       where Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static
214/// #   {
215/// #       chain(state)
216/// #   }
217/// # }
218/// #
219/// # impl Middleware for MiddlewareTwo {
220/// #   fn call<Chain>(self, state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
221/// #       where Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static
222/// #   {
223/// #       chain(state)
224/// #   }
225/// # }
226/// #
227/// # impl Middleware for MiddlewareThree {
228/// #   fn call<Chain>(self, state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
229/// #       where Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static
230/// #   {
231/// #       chain(state)
232/// #   }
233/// # }
234/// #
235/// # fn main() {
236/// new_pipeline()
237///     .add(MiddlewareOne)
238///     .add(MiddlewareTwo)
239///     .add(MiddlewareThree)
240///     .build();
241/// # }
242/// ```
243///
244/// The pipeline defined here is invoked in this order:
245///
246/// `(&mut state)` &rarr; `MiddlewareOne` &rarr; `MiddlewareTwo` &rarr; `MiddlewareThree` &rarr;
247/// `handler` (provided later, when building the router)
248pub struct PipelineBuilder<T>
249where
250    T: NewMiddlewareChain,
251{
252    t: T,
253}
254
255impl<T> PipelineBuilder<T>
256where
257    T: NewMiddlewareChain,
258{
259    /// Builds a `Pipeline`, which contains all middleware in the order provided via `add` and is
260    /// ready to process requests via a `NewHandler` provided to `Pipeline::call`.
261    pub fn build(self) -> Pipeline<T>
262    where
263        T: NewMiddlewareChain,
264    {
265        Pipeline { chain: self.t }
266    }
267
268    /// Adds a `NewMiddleware` which will create a `Middleware` during request dispatch.
269    pub fn add<M>(self, m: M) -> PipelineBuilder<(M, T)>
270    where
271        M: NewMiddleware,
272        M::Instance: Send + 'static,
273        Self: Sized,
274    {
275        // "cons" the most recently added `NewMiddleware` onto the front of the list. This is
276        // essentially building an HList-style tuple in reverse order. So for a call like:
277        //
278        //     new_pipeline().add(MiddlewareOne).add(MiddlewareTwo).add(MiddlewareThree)
279        //
280        // The resulting `PipelineBuilder` will be:
281        //
282        //     PipelineBuilder { t: (MiddlewareThree, (MiddlewareTwo, (MiddlewareOne, ()))) }
283        //
284        // An empty `PipelineBuilder` is represented as:
285        //
286        //     PipelineBuilder { t: () }
287        trace!(" adding middleware to pipeline");
288        PipelineBuilder { t: (m, self.t) }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    use futures_util::future::{self, FutureExt};
297    use hyper::{Body, Response, StatusCode};
298
299    use crate::handler::Handler;
300    use crate::middleware::Middleware;
301    use crate::state::StateData;
302    use crate::test::TestServer;
303
304    fn handler(state: State) -> (State, Response<Body>) {
305        let number = state.borrow::<Number>().value;
306        (
307            state,
308            Response::builder()
309                .status(StatusCode::OK)
310                .body(format!("{}", number).into())
311                .unwrap(),
312        )
313    }
314
315    #[derive(Clone)]
316    struct Number {
317        value: i32,
318    }
319
320    impl NewMiddleware for Number {
321        type Instance = Number;
322
323        fn new_middleware(&self) -> anyhow::Result<Number> {
324            Ok(self.clone())
325        }
326    }
327
328    impl Middleware for Number {
329        fn call<Chain>(self, mut state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
330        where
331            Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static,
332            Self: Sized,
333        {
334            state.put(self);
335            chain(state)
336        }
337    }
338
339    impl StateData for Number {}
340
341    struct Addition {
342        value: i32,
343    }
344
345    impl NewMiddleware for Addition {
346        type Instance = Addition;
347
348        fn new_middleware(&self) -> anyhow::Result<Addition> {
349            Ok(Addition { ..*self })
350        }
351    }
352
353    impl Middleware for Addition {
354        fn call<Chain>(self, mut state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
355        where
356            Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + Send + 'static,
357            Self: Sized,
358        {
359            state.borrow_mut::<Number>().value += self.value;
360            chain(state)
361        }
362    }
363
364    struct Multiplication {
365        value: i32,
366    }
367
368    impl NewMiddleware for Multiplication {
369        type Instance = Multiplication;
370
371        fn new_middleware(&self) -> anyhow::Result<Multiplication> {
372            Ok(Multiplication { ..*self })
373        }
374    }
375
376    impl Middleware for Multiplication {
377        fn call<Chain>(self, mut state: State, chain: Chain) -> Pin<Box<HandlerFuture>>
378        where
379            Chain: FnOnce(State) -> Pin<Box<HandlerFuture>> + 'static,
380            Self: Sized,
381        {
382            state.borrow_mut::<Number>().value *= self.value;
383            chain(state)
384        }
385    }
386
387    #[test]
388    fn pipeline_ordering_test() {
389        let test_server = TestServer::new(|| {
390            let pipeline = new_pipeline()
391                .add(Number { value: 0 }) // 0
392                .add(Addition { value: 1 }) // 1
393                .add(Multiplication { value: 2 }) // 2
394                .add(Addition { value: 1 }) // 3
395                .add(Multiplication { value: 2 }) // 6
396                .add(Addition { value: 2 }) // 8
397                .add(Multiplication { value: 3 }) // 24
398                .build();
399
400            Ok(move |state| match pipeline.construct() {
401                Ok(p) => p.call(state, |state| handler.handle(state)),
402                Err(e) => future::err((state, e.into())).boxed(),
403            })
404        })
405        .unwrap();
406
407        let response = test_server
408            .client()
409            .get("http://localhost/")
410            .perform()
411            .unwrap();
412
413        let buf = response.read_body().unwrap();
414        assert_eq!(buf.as_slice(), b"24");
415    }
416}