tokio/runtime/runtime.rs
1use super::BOX_FUTURE_THRESHOLD;
2use crate::runtime::blocking::BlockingPool;
3use crate::runtime::scheduler::CurrentThread;
4use crate::runtime::{context, EnterGuard, Handle};
5use crate::task::JoinHandle;
6use crate::util::trace::SpawnMeta;
7
8use std::future::Future;
9use std::mem;
10use std::time::Duration;
11
12cfg_rt_multi_thread! {
13 use crate::runtime::Builder;
14 use crate::runtime::scheduler::MultiThread;
15}
16
17/// The Tokio runtime.
18///
19/// The runtime provides an I/O driver, task scheduler, [timer], and
20/// blocking pool, necessary for running asynchronous tasks.
21///
22/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
23/// However, most users will use the [`#[tokio::main]`][main] annotation on
24/// their entry point instead.
25///
26/// See [module level][mod] documentation for more details.
27///
28/// # Shutdown
29///
30/// Shutting down the runtime is done by dropping the value, or calling
31/// [`shutdown_background`] or [`shutdown_timeout`].
32///
33/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
34/// Then they are dropped. They are not *guaranteed* to run to completion, but
35/// *might* do so if they do not yield until completion.
36///
37/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
38/// until they return.
39///
40/// The thread initiating the shutdown blocks until all spawned work has been
41/// stopped. This can take an indefinite amount of time. The `Drop`
42/// implementation waits forever for this.
43///
44/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
45/// waiting forever is undesired. When the timeout is reached, spawned work that
46/// did not stop in time and threads running it are leaked. The work continues
47/// to run until one of the stopping conditions is fulfilled, but the thread
48/// initiating the shutdown is unblocked.
49///
50/// Once the runtime has been dropped, any outstanding I/O resources bound to
51/// it will no longer function. Calling any method on them will result in an
52/// error.
53///
54/// # Sharing
55///
56/// There are several ways to establish shared access to a Tokio runtime:
57///
58/// * Using an <code>[Arc]\<Runtime></code>.
59/// * Using a [`Handle`].
60/// * Entering the runtime context.
61///
62/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
63/// things with the runtime such as spawning new tasks or entering the runtime
64/// context. Both types can be cloned to create a new handle that allows access
65/// to the same runtime. By passing clones into different tasks or threads, you
66/// will be able to access the runtime from those tasks or threads.
67///
68/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
69/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
70/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
71/// runtime happens when the destructor of the `Runtime` object runs.
72///
73/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
74/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
75/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
76/// reference is left over.
77///
78/// The runtime context is entered using the [`Runtime::enter`] or
79/// [`Handle::enter`] methods, which use a thread-local variable to store the
80/// current runtime. Whenever you are inside the runtime context, methods such
81/// as [`tokio::spawn`] will use the runtime whose context you are inside.
82///
83/// [timer]: crate::time
84/// [mod]: index.html
85/// [`new`]: method@Self::new
86/// [`Builder`]: struct@Builder
87/// [`Handle`]: struct@Handle
88/// [main]: macro@crate::main
89/// [`tokio::spawn`]: crate::spawn
90/// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
91/// [Arc]: std::sync::Arc
92/// [`shutdown_background`]: method@Runtime::shutdown_background
93/// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
94#[derive(Debug)]
95pub struct Runtime {
96 /// Task scheduler
97 scheduler: Scheduler,
98
99 /// Handle to runtime, also contains driver handles
100 handle: Handle,
101
102 /// Blocking pool handle, used to signal shutdown
103 blocking_pool: BlockingPool,
104}
105
106/// The flavor of a `Runtime`.
107///
108/// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
109#[derive(Debug, PartialEq, Eq)]
110#[non_exhaustive]
111pub enum RuntimeFlavor {
112 /// The flavor that executes all tasks on the current thread.
113 CurrentThread,
114 /// The flavor that executes tasks across multiple threads.
115 MultiThread,
116}
117
118/// The runtime scheduler is either a multi-thread or a current-thread executor.
119#[derive(Debug)]
120pub(super) enum Scheduler {
121 /// Execute all tasks on the current-thread.
122 CurrentThread(CurrentThread),
123
124 /// Execute tasks across multiple threads.
125 #[cfg(feature = "rt-multi-thread")]
126 MultiThread(MultiThread),
127}
128
129impl Runtime {
130 pub(super) fn from_parts(
131 scheduler: Scheduler,
132 handle: Handle,
133 blocking_pool: BlockingPool,
134 ) -> Runtime {
135 Runtime {
136 scheduler,
137 handle,
138 blocking_pool,
139 }
140 }
141
142 /// Creates a new runtime instance with default configuration values.
143 ///
144 /// This results in the multi threaded scheduler, I/O driver, and time driver being
145 /// initialized.
146 ///
147 /// Most applications will not need to call this function directly. Instead,
148 /// they will use the [`#[tokio::main]` attribute][main]. When a more complex
149 /// configuration is necessary, the [runtime builder] may be used.
150 ///
151 /// See [module level][mod] documentation for more details.
152 ///
153 /// # Examples
154 ///
155 /// Creating a new `Runtime` with default configuration values.
156 ///
157 /// ```
158 /// use tokio::runtime::Runtime;
159 ///
160 /// let rt = Runtime::new()
161 /// .unwrap();
162 ///
163 /// // Use the runtime...
164 /// ```
165 ///
166 /// [mod]: index.html
167 /// [main]: ../attr.main.html
168 /// [threaded scheduler]: index.html#threaded-scheduler
169 /// [runtime builder]: crate::runtime::Builder
170 #[cfg(feature = "rt-multi-thread")]
171 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
172 pub fn new() -> std::io::Result<Runtime> {
173 Builder::new_multi_thread().enable_all().build()
174 }
175
176 /// Returns a handle to the runtime's spawner.
177 ///
178 /// The returned handle can be used to spawn tasks that run on this runtime, and can
179 /// be cloned to allow moving the `Handle` to other threads.
180 ///
181 /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
182 /// Refer to the documentation of [`Handle::block_on`] for more.
183 ///
184 /// # Examples
185 ///
186 /// ```
187 /// use tokio::runtime::Runtime;
188 ///
189 /// let rt = Runtime::new()
190 /// .unwrap();
191 ///
192 /// let handle = rt.handle();
193 ///
194 /// // Use the handle...
195 /// ```
196 pub fn handle(&self) -> &Handle {
197 &self.handle
198 }
199
200 /// Spawns a future onto the Tokio runtime.
201 ///
202 /// This spawns the given future onto the runtime's executor, usually a
203 /// thread pool. The thread pool is then responsible for polling the future
204 /// until it completes.
205 ///
206 /// The provided future will start running in the background immediately
207 /// when `spawn` is called, even if you don't await the returned
208 /// `JoinHandle`.
209 ///
210 /// See [module level][mod] documentation for more details.
211 ///
212 /// [mod]: index.html
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use tokio::runtime::Runtime;
218 ///
219 /// # fn dox() {
220 /// // Create the runtime
221 /// let rt = Runtime::new().unwrap();
222 ///
223 /// // Spawn a future onto the runtime
224 /// rt.spawn(async {
225 /// println!("now running on a worker thread");
226 /// });
227 /// # }
228 /// ```
229 #[track_caller]
230 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
231 where
232 F: Future + Send + 'static,
233 F::Output: Send + 'static,
234 {
235 let fut_size = mem::size_of::<F>();
236 if fut_size > BOX_FUTURE_THRESHOLD {
237 self.handle
238 .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
239 } else {
240 self.handle
241 .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
242 }
243 }
244
245 /// Runs the provided function on an executor dedicated to blocking operations.
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// use tokio::runtime::Runtime;
251 ///
252 /// # fn dox() {
253 /// // Create the runtime
254 /// let rt = Runtime::new().unwrap();
255 ///
256 /// // Spawn a blocking function onto the runtime
257 /// rt.spawn_blocking(|| {
258 /// println!("now running on a worker thread");
259 /// });
260 /// # }
261 /// ```
262 #[track_caller]
263 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
264 where
265 F: FnOnce() -> R + Send + 'static,
266 R: Send + 'static,
267 {
268 self.handle.spawn_blocking(func)
269 }
270
271 /// Runs a future to completion on the Tokio runtime. This is the
272 /// runtime's entry point.
273 ///
274 /// This runs the given future on the current thread, blocking until it is
275 /// complete, and yielding its resolved result. Any tasks or timers
276 /// which the future spawns internally will be executed on the runtime.
277 ///
278 /// # Non-worker future
279 ///
280 /// Note that the future required by this function does not run as a
281 /// worker. The expectation is that other tasks are spawned by the future here.
282 /// Awaiting on other futures from the future provided here will not
283 /// perform as fast as those spawned as workers.
284 ///
285 /// # Multi thread scheduler
286 ///
287 /// When the multi thread scheduler is used this will allow futures
288 /// to run within the io driver and timer context of the overall runtime.
289 ///
290 /// Any spawned tasks will continue running after `block_on` returns.
291 ///
292 /// # Current thread scheduler
293 ///
294 /// When the current thread scheduler is enabled `block_on`
295 /// can be called concurrently from multiple threads. The first call
296 /// will take ownership of the io and timer drivers. This means
297 /// other threads which do not own the drivers will hook into that one.
298 /// When the first `block_on` completes, other threads will be able to
299 /// "steal" the driver to allow continued execution of their futures.
300 ///
301 /// Any spawned tasks will be suspended after `block_on` returns. Calling
302 /// `block_on` again will resume previously spawned tasks.
303 ///
304 /// # Panics
305 ///
306 /// This function panics if the provided future panics, or if called within an
307 /// asynchronous execution context.
308 ///
309 /// # Examples
310 ///
311 /// ```no_run
312 /// use tokio::runtime::Runtime;
313 ///
314 /// // Create the runtime
315 /// let rt = Runtime::new().unwrap();
316 ///
317 /// // Execute the future, blocking the current thread until completion
318 /// rt.block_on(async {
319 /// println!("hello");
320 /// });
321 /// ```
322 ///
323 /// [handle]: fn@Handle::block_on
324 #[track_caller]
325 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
326 let fut_size = mem::size_of::<F>();
327 if fut_size > BOX_FUTURE_THRESHOLD {
328 self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
329 } else {
330 self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
331 }
332 }
333
334 #[track_caller]
335 fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
336 #[cfg(all(
337 tokio_unstable,
338 tokio_taskdump,
339 feature = "rt",
340 target_os = "linux",
341 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
342 ))]
343 let future = super::task::trace::Trace::root(future);
344
345 #[cfg(all(tokio_unstable, feature = "tracing"))]
346 let future = crate::util::trace::task(
347 future,
348 "block_on",
349 _meta,
350 crate::runtime::task::Id::next().as_u64(),
351 );
352
353 let _enter = self.enter();
354
355 match &self.scheduler {
356 Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
357 #[cfg(feature = "rt-multi-thread")]
358 Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
359 }
360 }
361
362 /// Enters the runtime context.
363 ///
364 /// This allows you to construct types that must have an executor
365 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
366 /// also allow you to call methods such as [`tokio::spawn`].
367 ///
368 /// [`Sleep`]: struct@crate::time::Sleep
369 /// [`TcpStream`]: struct@crate::net::TcpStream
370 /// [`tokio::spawn`]: fn@crate::spawn
371 ///
372 /// # Example
373 ///
374 /// ```
375 /// use tokio::runtime::Runtime;
376 /// use tokio::task::JoinHandle;
377 ///
378 /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
379 /// // Had we not used `rt.enter` below, this would panic.
380 /// tokio::spawn(async move {
381 /// println!("{}", msg);
382 /// })
383 /// }
384 ///
385 /// fn main() {
386 /// let rt = Runtime::new().unwrap();
387 ///
388 /// let s = "Hello World!".to_string();
389 ///
390 /// // By entering the context, we tie `tokio::spawn` to this executor.
391 /// let _guard = rt.enter();
392 /// let handle = function_that_spawns(s);
393 ///
394 /// // Wait for the task before we end the test.
395 /// rt.block_on(handle).unwrap();
396 /// }
397 /// ```
398 pub fn enter(&self) -> EnterGuard<'_> {
399 self.handle.enter()
400 }
401
402 /// Shuts down the runtime, waiting for at most `duration` for all spawned
403 /// work to stop.
404 ///
405 /// See the [struct level documentation](Runtime#shutdown) for more details.
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// use tokio::runtime::Runtime;
411 /// use tokio::task;
412 ///
413 /// use std::thread;
414 /// use std::time::Duration;
415 ///
416 /// fn main() {
417 /// # if cfg!(miri) { return } // Miri reports error when main thread terminated without waiting all remaining threads.
418 /// let runtime = Runtime::new().unwrap();
419 ///
420 /// runtime.block_on(async move {
421 /// task::spawn_blocking(move || {
422 /// thread::sleep(Duration::from_secs(10_000));
423 /// });
424 /// });
425 ///
426 /// runtime.shutdown_timeout(Duration::from_millis(100));
427 /// }
428 /// ```
429 pub fn shutdown_timeout(mut self, duration: Duration) {
430 // Wakeup and shutdown all the worker threads
431 self.handle.inner.shutdown();
432 self.blocking_pool.shutdown(Some(duration));
433 }
434
435 /// Shuts down the runtime, without waiting for any spawned work to stop.
436 ///
437 /// This can be useful if you want to drop a runtime from within another runtime.
438 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
439 /// to complete, which would normally not be permitted within an asynchronous context.
440 /// By calling `shutdown_background()`, you can drop the runtime from such a context.
441 ///
442 /// Note however, that because we do not wait for any blocking tasks to complete, this
443 /// may result in a resource leak (in that any blocking tasks are still running until they
444 /// return.
445 ///
446 /// See the [struct level documentation](Runtime#shutdown) for more details.
447 ///
448 /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
449 ///
450 /// ```
451 /// use tokio::runtime::Runtime;
452 ///
453 /// fn main() {
454 /// let runtime = Runtime::new().unwrap();
455 ///
456 /// runtime.block_on(async move {
457 /// let inner_runtime = Runtime::new().unwrap();
458 /// // ...
459 /// inner_runtime.shutdown_background();
460 /// });
461 /// }
462 /// ```
463 pub fn shutdown_background(self) {
464 self.shutdown_timeout(Duration::from_nanos(0));
465 }
466
467 /// Returns a view that lets you get information about how the runtime
468 /// is performing.
469 pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
470 self.handle.metrics()
471 }
472}
473
474#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
475impl Drop for Runtime {
476 fn drop(&mut self) {
477 match &mut self.scheduler {
478 Scheduler::CurrentThread(current_thread) => {
479 // This ensures that tasks spawned on the current-thread
480 // runtime are dropped inside the runtime's context.
481 let _guard = context::try_set_current(&self.handle.inner);
482 current_thread.shutdown(&self.handle.inner);
483 }
484 #[cfg(feature = "rt-multi-thread")]
485 Scheduler::MultiThread(multi_thread) => {
486 // The threaded scheduler drops its tasks on its worker threads, which is
487 // already in the runtime's context.
488 multi_thread.shutdown(&self.handle.inner);
489 }
490 }
491 }
492}
493
494impl std::panic::UnwindSafe for Runtime {}
495
496impl std::panic::RefUnwindSafe for Runtime {}