tracing_futures/executor/futures_01.rs
1use crate::{Instrument, Instrumented, WithDispatch};
2use futures_01::{
3 future::{ExecuteError, Executor},
4 Future,
5};
6
7impl<T, F> Executor<F> for Instrumented<T>
8where
9 T: Executor<Instrumented<F>>,
10 F: Future<Item = (), Error = ()>,
11{
12 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
13 let future = future.instrument(self.span.clone());
14 self.inner.execute(future).map_err(|e| {
15 let kind = e.kind();
16 let future = e.into_future().into_inner();
17 ExecuteError::new(kind, future)
18 })
19 }
20}
21
22impl<T, F> Executor<F> for WithDispatch<T>
23where
24 T: Executor<WithDispatch<F>>,
25 F: Future<Item = (), Error = ()>,
26{
27 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
28 let future = self.with_dispatch(future);
29 self.inner.execute(future).map_err(|e| {
30 let kind = e.kind();
31 let future = e.into_future().inner;
32 ExecuteError::new(kind, future)
33 })
34 }
35}
36
37#[cfg(feature = "tokio")]
38#[allow(unreachable_pub, unused_imports)] // https://github.com/rust-lang/rust/issues/57411
39pub use self::tokio::*;
40
41#[cfg(feature = "tokio")]
42mod tokio {
43 use crate::{Instrument, Instrumented, WithDispatch};
44 use futures_01::Future;
45 use tokio_01::{
46 executor::{Executor, SpawnError, TypedExecutor},
47 runtime::{current_thread, Runtime, TaskExecutor},
48 };
49
50 impl<T> Executor for Instrumented<T>
51 where
52 T: Executor,
53 {
54 fn spawn(
55 &mut self,
56 future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
57 ) -> Result<(), SpawnError> {
58 // TODO: get rid of double box somehow?
59 let future = Box::new(future.instrument(self.span.clone()));
60 self.inner.spawn(future)
61 }
62 }
63
64 impl<T, F> TypedExecutor<F> for Instrumented<T>
65 where
66 T: TypedExecutor<Instrumented<F>>,
67 {
68 fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
69 self.inner.spawn(future.instrument(self.span.clone()))
70 }
71
72 fn status(&self) -> Result<(), SpawnError> {
73 self.inner.status()
74 }
75 }
76
77 impl Instrumented<Runtime> {
78 /// Spawn an instrumented future onto the Tokio runtime.
79 ///
80 /// This spawns the given future onto the runtime's executor, usually a
81 /// thread pool. The thread pool is then responsible for polling the
82 /// future until it completes.
83 ///
84 /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
85 /// instrumenting the spawned future beforehand.
86 pub fn spawn<F>(&mut self, future: F) -> &mut Self
87 where
88 F: Future<Item = (), Error = ()> + Send + 'static,
89 {
90 let future = future.instrument(self.span.clone());
91 self.inner.spawn(future);
92 self
93 }
94
95 /// Run an instrumented future to completion on the Tokio runtime.
96 ///
97 /// This runs the given future on the runtime, blocking until it is
98 /// complete, and yielding its resolved result. Any tasks or timers which
99 /// the future spawns internally will be executed on the runtime.
100 ///
101 /// This method should not be called from an asynchronous context.
102 ///
103 /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
104 /// instrumenting the spawned future beforehand.
105 ///
106 /// # Panics
107 ///
108 /// This function panics if the executor is at capacity, if the provided
109 /// future panics, or if called within an asynchronous execution context.
110 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
111 where
112 F: Send + 'static + Future<Item = R, Error = E>,
113 R: Send + 'static,
114 E: Send + 'static,
115 {
116 let future = future.instrument(self.span.clone());
117 self.inner.block_on(future)
118 }
119
120 /// Return an instrumented handle to the runtime's executor.
121 ///
122 /// The returned handle can be used to spawn tasks that run on this runtime.
123 ///
124 /// The instrumented handle functions identically to a
125 /// `tokio::runtime::TaskExecutor`, but instruments the spawned
126 /// futures prior to spawning them.
127 pub fn executor(&self) -> Instrumented<TaskExecutor> {
128 self.inner.executor().instrument(self.span.clone())
129 }
130 }
131
132 impl Instrumented<current_thread::Runtime> {
133 /// Spawn an instrumented future onto the single-threaded Tokio runtime.
134 ///
135 /// This method simply wraps a call to `current_thread::Runtime::spawn`,
136 /// instrumenting the spawned future beforehand.
137 pub fn spawn<F>(&mut self, future: F) -> &mut Self
138 where
139 F: Future<Item = (), Error = ()> + 'static,
140 {
141 let future = future.instrument(self.span.clone());
142 self.inner.spawn(future);
143 self
144 }
145
146 /// Instruments and runs the provided future, blocking the current thread
147 /// until the future completes.
148 ///
149 /// This function can be used to synchronously block the current thread
150 /// until the provided `future` has resolved either successfully or with an
151 /// error. The result of the future is then returned from this function
152 /// call.
153 ///
154 /// Note that this function will **also** execute any spawned futures on the
155 /// current thread, but will **not** block until these other spawned futures
156 /// have completed. Once the function returns, any uncompleted futures
157 /// remain pending in the `Runtime` instance. These futures will not run
158 /// until `block_on` or `run` is called again.
159 ///
160 /// The caller is responsible for ensuring that other spawned futures
161 /// complete execution by calling `block_on` or `run`.
162 ///
163 /// This method simply wraps a call to `current_thread::Runtime::block_on`,
164 /// instrumenting the spawned future beforehand.
165 ///
166 /// # Panics
167 ///
168 /// This function panics if the executor is at capacity, if the provided
169 /// future panics, or if called within an asynchronous execution context.
170 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
171 where
172 F: 'static + Future<Item = R, Error = E>,
173 R: 'static,
174 E: 'static,
175 {
176 let future = future.instrument(self.span.clone());
177 self.inner.block_on(future)
178 }
179
180 /// Get a new instrumented handle to spawn futures on the single-threaded
181 /// Tokio runtime
182 ///
183 /// Different to the runtime itself, the handle can be sent to different
184 /// threads.
185 ///
186 /// The instrumented handle functions identically to a
187 /// `tokio::runtime::current_thread::Handle`, but instruments the spawned
188 /// futures prior to spawning them.
189 pub fn handle(&self) -> Instrumented<current_thread::Handle> {
190 self.inner.handle().instrument(self.span.clone())
191 }
192 }
193
194 impl<T> Executor for WithDispatch<T>
195 where
196 T: Executor,
197 {
198 fn spawn(
199 &mut self,
200 future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
201 ) -> Result<(), SpawnError> {
202 // TODO: get rid of double box?
203 let future = Box::new(self.with_dispatch(future));
204 self.inner.spawn(future)
205 }
206 }
207
208 impl<T, F> TypedExecutor<F> for WithDispatch<T>
209 where
210 T: TypedExecutor<WithDispatch<F>>,
211 {
212 fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
213 self.inner.spawn(self.with_dispatch(future))
214 }
215
216 fn status(&self) -> Result<(), SpawnError> {
217 self.inner.status()
218 }
219 }
220
221 impl WithDispatch<Runtime> {
222 /// Spawn a future onto the Tokio runtime, in the context of this
223 /// `WithDispatch`'s trace dispatcher.
224 ///
225 /// This spawns the given future onto the runtime's executor, usually a
226 /// thread pool. The thread pool is then responsible for polling the
227 /// future until it completes.
228 ///
229 /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
230 /// instrumenting the spawned future beforehand.
231 pub fn spawn<F>(&mut self, future: F) -> &mut Self
232 where
233 F: Future<Item = (), Error = ()> + Send + 'static,
234 {
235 let future = self.with_dispatch(future);
236 self.inner.spawn(future);
237 self
238 }
239
240 /// Run a future to completion on the Tokio runtime, in the context of this
241 /// `WithDispatch`'s trace dispatcher.
242 ///
243 /// This runs the given future on the runtime, blocking until it is
244 /// complete, and yielding its resolved result. Any tasks or timers which
245 /// the future spawns internally will be executed on the runtime.
246 ///
247 /// This method should not be called from an asynchronous context.
248 ///
249 /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
250 /// instrumenting the spawned future beforehand.
251 ///
252 /// # Panics
253 ///
254 /// This function panics if the executor is at capacity, if the provided
255 /// future panics, or if called within an asynchronous execution context.
256 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
257 where
258 F: Send + 'static + Future<Item = R, Error = E>,
259 R: Send + 'static,
260 E: Send + 'static,
261 {
262 let future = self.with_dispatch(future);
263 self.inner.block_on(future)
264 }
265
266 /// Return a handle to the runtime's executor, in the context of this
267 /// `WithDispatch`'s trace dispatcher.
268 ///
269 /// The returned handle can be used to spawn tasks that run on this runtime.
270 ///
271 /// The instrumented handle functions identically to a
272 /// `tokio::runtime::TaskExecutor`, but instruments the spawned
273 /// futures prior to spawning them.
274 pub fn executor(&self) -> WithDispatch<TaskExecutor> {
275 self.with_dispatch(self.inner.executor())
276 }
277 }
278
279 impl WithDispatch<current_thread::Runtime> {
280 /// Spawn a future onto the single-threaded Tokio runtime, in the context
281 /// of this `WithDispatch`'s trace dispatcher.
282 ///
283 /// This method simply wraps a call to `current_thread::Runtime::spawn`,
284 /// instrumenting the spawned future beforehand.
285 pub fn spawn<F>(&mut self, future: F) -> &mut Self
286 where
287 F: Future<Item = (), Error = ()> + 'static,
288 {
289 let future = self.with_dispatch(future);
290 self.inner.spawn(future);
291 self
292 }
293
294 /// Runs the provided future in the context of this `WithDispatch`'s trace
295 /// dispatcher, blocking the current thread until the future completes.
296 ///
297 /// This function can be used to synchronously block the current thread
298 /// until the provided `future` has resolved either successfully or with an
299 /// error. The result of the future is then returned from this function
300 /// call.
301 ///
302 /// Note that this function will **also** execute any spawned futures on the
303 /// current thread, but will **not** block until these other spawned futures
304 /// have completed. Once the function returns, any uncompleted futures
305 /// remain pending in the `Runtime` instance. These futures will not run
306 /// until `block_on` or `run` is called again.
307 ///
308 /// The caller is responsible for ensuring that other spawned futures
309 /// complete execution by calling `block_on` or `run`.
310 ///
311 /// This method simply wraps a call to `current_thread::Runtime::block_on`,
312 /// instrumenting the spawned future beforehand.
313 ///
314 /// # Panics
315 ///
316 /// This function panics if the executor is at capacity, if the provided
317 /// future panics, or if called within an asynchronous execution context.
318 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
319 where
320 F: 'static + Future<Item = R, Error = E>,
321 R: 'static,
322 E: 'static,
323 {
324 let future = self.with_dispatch(future);
325 self.inner.block_on(future)
326 }
327
328 /// Get a new handle to spawn futures on the single-threaded Tokio runtime,
329 /// in the context of this `WithDispatch`'s trace dispatcher.\
330 ///
331 /// Different to the runtime itself, the handle can be sent to different
332 /// threads.
333 ///
334 /// The instrumented handle functions identically to a
335 /// `tokio::runtime::current_thread::Handle`, but the spawned
336 /// futures are run in the context of the trace dispatcher.
337 pub fn handle(&self) -> WithDispatch<current_thread::Handle> {
338 self.with_dispatch(self.inner.handle())
339 }
340 }
341}