🛈 Note: This is pre-release documentation for the upcoming tracing 0.2.0 ecosystem.

For the release documentation, please see docs.rs, instead.

tracing_futures/executor/
futures_01.rs

1use crate::{Instrument, Instrumented, WithDispatch};
2use futures_01::{
3    future::{ExecuteError, Executor},
4    Future,
5};
6
7impl<T, F> Executor<F> for Instrumented<T>
8where
9    T: Executor<Instrumented<F>>,
10    F: Future<Item = (), Error = ()>,
11{
12    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
13        let future = future.instrument(self.span.clone());
14        self.inner.execute(future).map_err(|e| {
15            let kind = e.kind();
16            let future = e.into_future().into_inner();
17            ExecuteError::new(kind, future)
18        })
19    }
20}
21
22impl<T, F> Executor<F> for WithDispatch<T>
23where
24    T: Executor<WithDispatch<F>>,
25    F: Future<Item = (), Error = ()>,
26{
27    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
28        let future = self.with_dispatch(future);
29        self.inner.execute(future).map_err(|e| {
30            let kind = e.kind();
31            let future = e.into_future().inner;
32            ExecuteError::new(kind, future)
33        })
34    }
35}
36
37#[cfg(feature = "tokio")]
38#[allow(unreachable_pub, unused_imports)] // https://github.com/rust-lang/rust/issues/57411
39pub use self::tokio::*;
40
41#[cfg(feature = "tokio")]
42mod tokio {
43    use crate::{Instrument, Instrumented, WithDispatch};
44    use futures_01::Future;
45    use tokio_01::{
46        executor::{Executor, SpawnError, TypedExecutor},
47        runtime::{current_thread, Runtime, TaskExecutor},
48    };
49
50    impl<T> Executor for Instrumented<T>
51    where
52        T: Executor,
53    {
54        fn spawn(
55            &mut self,
56            future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
57        ) -> Result<(), SpawnError> {
58            // TODO: get rid of double box somehow?
59            let future = Box::new(future.instrument(self.span.clone()));
60            self.inner.spawn(future)
61        }
62    }
63
64    impl<T, F> TypedExecutor<F> for Instrumented<T>
65    where
66        T: TypedExecutor<Instrumented<F>>,
67    {
68        fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
69            self.inner.spawn(future.instrument(self.span.clone()))
70        }
71
72        fn status(&self) -> Result<(), SpawnError> {
73            self.inner.status()
74        }
75    }
76
77    impl Instrumented<Runtime> {
78        /// Spawn an instrumented future onto the Tokio runtime.
79        ///
80        /// This spawns the given future onto the runtime's executor, usually a
81        /// thread pool. The thread pool is then responsible for polling the
82        /// future until it completes.
83        ///
84        /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
85        /// instrumenting the spawned future beforehand.
86        pub fn spawn<F>(&mut self, future: F) -> &mut Self
87        where
88            F: Future<Item = (), Error = ()> + Send + 'static,
89        {
90            let future = future.instrument(self.span.clone());
91            self.inner.spawn(future);
92            self
93        }
94
95        /// Run an instrumented future to completion on the Tokio runtime.
96        ///
97        /// This runs the given future on the runtime, blocking until it is
98        /// complete, and yielding its resolved result. Any tasks or timers which
99        /// the future spawns internally will be executed on the runtime.
100        ///
101        /// This method should not be called from an asynchronous context.
102        ///
103        /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
104        /// instrumenting the spawned future beforehand.
105        ///
106        /// # Panics
107        ///
108        /// This function panics if the executor is at capacity, if the provided
109        /// future panics, or if called within an asynchronous execution context.
110        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
111        where
112            F: Send + 'static + Future<Item = R, Error = E>,
113            R: Send + 'static,
114            E: Send + 'static,
115        {
116            let future = future.instrument(self.span.clone());
117            self.inner.block_on(future)
118        }
119
120        /// Return an instrumented handle to the runtime's executor.
121        ///
122        /// The returned handle can be used to spawn tasks that run on this runtime.
123        ///
124        /// The instrumented handle functions identically to a
125        /// `tokio::runtime::TaskExecutor`, but instruments the spawned
126        /// futures prior to spawning them.
127        pub fn executor(&self) -> Instrumented<TaskExecutor> {
128            self.inner.executor().instrument(self.span.clone())
129        }
130    }
131
132    impl Instrumented<current_thread::Runtime> {
133        /// Spawn an instrumented future onto the single-threaded Tokio runtime.
134        ///
135        /// This method simply wraps a call to `current_thread::Runtime::spawn`,
136        /// instrumenting the spawned future beforehand.
137        pub fn spawn<F>(&mut self, future: F) -> &mut Self
138        where
139            F: Future<Item = (), Error = ()> + 'static,
140        {
141            let future = future.instrument(self.span.clone());
142            self.inner.spawn(future);
143            self
144        }
145
146        /// Instruments and runs the provided future, blocking the current thread
147        /// until the future completes.
148        ///
149        /// This function can be used to synchronously block the current thread
150        /// until the provided `future` has resolved either successfully or with an
151        /// error. The result of the future is then returned from this function
152        /// call.
153        ///
154        /// Note that this function will **also** execute any spawned futures on the
155        /// current thread, but will **not** block until these other spawned futures
156        /// have completed. Once the function returns, any uncompleted futures
157        /// remain pending in the `Runtime` instance. These futures will not run
158        /// until `block_on` or `run` is called again.
159        ///
160        /// The caller is responsible for ensuring that other spawned futures
161        /// complete execution by calling `block_on` or `run`.
162        ///
163        /// This method simply wraps a call to `current_thread::Runtime::block_on`,
164        /// instrumenting the spawned future beforehand.
165        ///
166        /// # Panics
167        ///
168        /// This function panics if the executor is at capacity, if the provided
169        /// future panics, or if called within an asynchronous execution context.
170        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
171        where
172            F: 'static + Future<Item = R, Error = E>,
173            R: 'static,
174            E: 'static,
175        {
176            let future = future.instrument(self.span.clone());
177            self.inner.block_on(future)
178        }
179
180        /// Get a new instrumented handle to spawn futures on the single-threaded
181        /// Tokio runtime
182        ///
183        /// Different to the runtime itself, the handle can be sent to different
184        /// threads.
185        ///
186        /// The instrumented handle functions identically to a
187        /// `tokio::runtime::current_thread::Handle`, but instruments the spawned
188        /// futures prior to spawning them.
189        pub fn handle(&self) -> Instrumented<current_thread::Handle> {
190            self.inner.handle().instrument(self.span.clone())
191        }
192    }
193
194    impl<T> Executor for WithDispatch<T>
195    where
196        T: Executor,
197    {
198        fn spawn(
199            &mut self,
200            future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
201        ) -> Result<(), SpawnError> {
202            // TODO: get rid of double box?
203            let future = Box::new(self.with_dispatch(future));
204            self.inner.spawn(future)
205        }
206    }
207
208    impl<T, F> TypedExecutor<F> for WithDispatch<T>
209    where
210        T: TypedExecutor<WithDispatch<F>>,
211    {
212        fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
213            self.inner.spawn(self.with_dispatch(future))
214        }
215
216        fn status(&self) -> Result<(), SpawnError> {
217            self.inner.status()
218        }
219    }
220
221    impl WithDispatch<Runtime> {
222        /// Spawn a future onto the Tokio runtime, in the context of this
223        /// `WithDispatch`'s trace dispatcher.
224        ///
225        /// This spawns the given future onto the runtime's executor, usually a
226        /// thread pool. The thread pool is then responsible for polling the
227        /// future until it completes.
228        ///
229        /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
230        /// instrumenting the spawned future beforehand.
231        pub fn spawn<F>(&mut self, future: F) -> &mut Self
232        where
233            F: Future<Item = (), Error = ()> + Send + 'static,
234        {
235            let future = self.with_dispatch(future);
236            self.inner.spawn(future);
237            self
238        }
239
240        /// Run a future to completion on the Tokio runtime, in the context of this
241        /// `WithDispatch`'s trace dispatcher.
242        ///
243        /// This runs the given future on the runtime, blocking until it is
244        /// complete, and yielding its resolved result. Any tasks or timers which
245        /// the future spawns internally will be executed on the runtime.
246        ///
247        /// This method should not be called from an asynchronous context.
248        ///
249        /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
250        /// instrumenting the spawned future beforehand.
251        ///
252        /// # Panics
253        ///
254        /// This function panics if the executor is at capacity, if the provided
255        /// future panics, or if called within an asynchronous execution context.
256        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
257        where
258            F: Send + 'static + Future<Item = R, Error = E>,
259            R: Send + 'static,
260            E: Send + 'static,
261        {
262            let future = self.with_dispatch(future);
263            self.inner.block_on(future)
264        }
265
266        /// Return a handle to the runtime's executor, in the context of this
267        /// `WithDispatch`'s trace dispatcher.
268        ///
269        /// The returned handle can be used to spawn tasks that run on this runtime.
270        ///
271        /// The instrumented handle functions identically to a
272        /// `tokio::runtime::TaskExecutor`, but instruments the spawned
273        /// futures prior to spawning them.
274        pub fn executor(&self) -> WithDispatch<TaskExecutor> {
275            self.with_dispatch(self.inner.executor())
276        }
277    }
278
279    impl WithDispatch<current_thread::Runtime> {
280        /// Spawn a future onto the single-threaded Tokio runtime, in the context
281        /// of this `WithDispatch`'s trace dispatcher.
282        ///
283        /// This method simply wraps a call to `current_thread::Runtime::spawn`,
284        /// instrumenting the spawned future beforehand.
285        pub fn spawn<F>(&mut self, future: F) -> &mut Self
286        where
287            F: Future<Item = (), Error = ()> + 'static,
288        {
289            let future = self.with_dispatch(future);
290            self.inner.spawn(future);
291            self
292        }
293
294        /// Runs the provided future in the context of this `WithDispatch`'s trace
295        /// dispatcher, blocking the current thread until the future completes.
296        ///
297        /// This function can be used to synchronously block the current thread
298        /// until the provided `future` has resolved either successfully or with an
299        /// error. The result of the future is then returned from this function
300        /// call.
301        ///
302        /// Note that this function will **also** execute any spawned futures on the
303        /// current thread, but will **not** block until these other spawned futures
304        /// have completed. Once the function returns, any uncompleted futures
305        /// remain pending in the `Runtime` instance. These futures will not run
306        /// until `block_on` or `run` is called again.
307        ///
308        /// The caller is responsible for ensuring that other spawned futures
309        /// complete execution by calling `block_on` or `run`.
310        ///
311        /// This method simply wraps a call to `current_thread::Runtime::block_on`,
312        /// instrumenting the spawned future beforehand.
313        ///
314        /// # Panics
315        ///
316        /// This function panics if the executor is at capacity, if the provided
317        /// future panics, or if called within an asynchronous execution context.
318        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
319        where
320            F: 'static + Future<Item = R, Error = E>,
321            R: 'static,
322            E: 'static,
323        {
324            let future = self.with_dispatch(future);
325            self.inner.block_on(future)
326        }
327
328        /// Get a new handle to spawn futures on the single-threaded Tokio runtime,
329        /// in the context of this `WithDispatch`'s trace dispatcher.\
330        ///
331        /// Different to the runtime itself, the handle can be sent to different
332        /// threads.
333        ///
334        /// The instrumented handle functions identically to a
335        /// `tokio::runtime::current_thread::Handle`, but the spawned
336        /// futures are run in the context of the trace dispatcher.
337        pub fn handle(&self) -> WithDispatch<current_thread::Handle> {
338            self.with_dispatch(self.inner.handle())
339        }
340    }
341}