tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// use tokio::runtime::Builder;
35///
36/// fn main() {
37/// // build runtime
38/// let runtime = Builder::new_multi_thread()
39/// .worker_threads(4)
40/// .thread_name("my-custom-name")
41/// .thread_stack_size(3 * 1024 * 1024)
42/// .build()
43/// .unwrap();
44///
45/// // use runtime ...
46/// }
47/// ```
48pub struct Builder {
49 /// Runtime type
50 kind: Kind,
51
52 /// Whether or not to enable the I/O driver
53 enable_io: bool,
54 nevents: usize,
55
56 /// Whether or not to enable the time driver
57 enable_time: bool,
58
59 /// Whether or not the clock should start paused.
60 start_paused: bool,
61
62 /// The number of worker threads, used by Runtime.
63 ///
64 /// Only used when not using the current-thread executor.
65 worker_threads: Option<usize>,
66
67 /// Cap on thread usage.
68 max_blocking_threads: usize,
69
70 /// Name fn used for threads spawned by the runtime.
71 pub(super) thread_name: ThreadNameFn,
72
73 /// Stack size used for threads spawned by the runtime.
74 pub(super) thread_stack_size: Option<usize>,
75
76 /// Callback to run after each thread starts.
77 pub(super) after_start: Option<Callback>,
78
79 /// To run before each worker thread stops
80 pub(super) before_stop: Option<Callback>,
81
82 /// To run before each worker thread is parked.
83 pub(super) before_park: Option<Callback>,
84
85 /// To run after each thread is unparked.
86 pub(super) after_unpark: Option<Callback>,
87
88 /// To run before each task is spawned.
89 pub(super) before_spawn: Option<TaskCallback>,
90
91 /// To run before each poll
92 #[cfg(tokio_unstable)]
93 pub(super) before_poll: Option<TaskCallback>,
94
95 /// To run after each poll
96 #[cfg(tokio_unstable)]
97 pub(super) after_poll: Option<TaskCallback>,
98
99 /// To run after each task is terminated.
100 pub(super) after_termination: Option<TaskCallback>,
101
102 /// Customizable keep alive timeout for `BlockingPool`
103 pub(super) keep_alive: Option<Duration>,
104
105 /// How many ticks before pulling a task from the global/remote queue?
106 ///
107 /// When `None`, the value is unspecified and behavior details are left to
108 /// the scheduler. Each scheduler flavor could choose to either pick its own
109 /// default value or use some other strategy to decide when to poll from the
110 /// global queue. For example, the multi-threaded scheduler uses a
111 /// self-tuning strategy based on mean task poll times.
112 pub(super) global_queue_interval: Option<u32>,
113
114 /// How many ticks before yielding to the driver for timer and I/O events?
115 pub(super) event_interval: u32,
116
117 /// When true, the multi-threade scheduler LIFO slot should not be used.
118 ///
119 /// This option should only be exposed as unstable.
120 pub(super) disable_lifo_slot: bool,
121
122 /// Specify a random number generator seed to provide deterministic results
123 pub(super) seed_generator: RngSeedGenerator,
124
125 /// When true, enables task poll count histogram instrumentation.
126 pub(super) metrics_poll_count_histogram_enable: bool,
127
128 /// Configures the task poll count histogram
129 pub(super) metrics_poll_count_histogram: HistogramBuilder,
130
131 #[cfg(tokio_unstable)]
132 pub(super) unhandled_panic: UnhandledPanic,
133}
134
135cfg_unstable! {
136 /// How the runtime should respond to unhandled panics.
137 ///
138 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
139 /// to configure the runtime behavior when a spawned task panics.
140 ///
141 /// See [`Builder::unhandled_panic`] for more details.
142 #[derive(Debug, Clone)]
143 #[non_exhaustive]
144 pub enum UnhandledPanic {
145 /// The runtime should ignore panics on spawned tasks.
146 ///
147 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
148 /// tasks continue running normally.
149 ///
150 /// This is the default behavior.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use tokio::runtime::{self, UnhandledPanic};
156 ///
157 /// # pub fn main() {
158 /// let rt = runtime::Builder::new_current_thread()
159 /// .unhandled_panic(UnhandledPanic::Ignore)
160 /// .build()
161 /// .unwrap();
162 ///
163 /// let task1 = rt.spawn(async { panic!("boom"); });
164 /// let task2 = rt.spawn(async {
165 /// // This task completes normally
166 /// "done"
167 /// });
168 ///
169 /// rt.block_on(async {
170 /// // The panic on the first task is forwarded to the `JoinHandle`
171 /// assert!(task1.await.is_err());
172 ///
173 /// // The second task completes normally
174 /// assert!(task2.await.is_ok());
175 /// })
176 /// # }
177 /// ```
178 ///
179 /// [`JoinHandle`]: struct@crate::task::JoinHandle
180 Ignore,
181
182 /// The runtime should immediately shutdown if a spawned task panics.
183 ///
184 /// The runtime will immediately shutdown even if the panicked task's
185 /// [`JoinHandle`] is still available. All further spawned tasks will be
186 /// immediately dropped and call to [`Runtime::block_on`] will panic.
187 ///
188 /// # Examples
189 ///
190 /// ```should_panic
191 /// use tokio::runtime::{self, UnhandledPanic};
192 ///
193 /// # pub fn main() {
194 /// let rt = runtime::Builder::new_current_thread()
195 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
196 /// .build()
197 /// .unwrap();
198 ///
199 /// rt.spawn(async { panic!("boom"); });
200 /// rt.spawn(async {
201 /// // This task never completes.
202 /// });
203 ///
204 /// rt.block_on(async {
205 /// // Do some work
206 /// # loop { tokio::task::yield_now().await; }
207 /// })
208 /// # }
209 /// ```
210 ///
211 /// [`JoinHandle`]: struct@crate::task::JoinHandle
212 ShutdownRuntime,
213 }
214}
215
216pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
217
218#[derive(Clone, Copy)]
219pub(crate) enum Kind {
220 CurrentThread,
221 #[cfg(feature = "rt-multi-thread")]
222 MultiThread,
223}
224
225impl Builder {
226 /// Returns a new builder with the current thread scheduler selected.
227 ///
228 /// Configuration methods can be chained on the return value.
229 ///
230 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
231 /// [`LocalSet`].
232 ///
233 /// [`LocalSet`]: crate::task::LocalSet
234 pub fn new_current_thread() -> Builder {
235 #[cfg(loom)]
236 const EVENT_INTERVAL: u32 = 4;
237 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
238 #[cfg(not(loom))]
239 const EVENT_INTERVAL: u32 = 61;
240
241 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
242 }
243
244 /// Returns a new builder with the multi thread scheduler selected.
245 ///
246 /// Configuration methods can be chained on the return value.
247 #[cfg(feature = "rt-multi-thread")]
248 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
249 pub fn new_multi_thread() -> Builder {
250 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
251 Builder::new(Kind::MultiThread, 61)
252 }
253
254 /// Returns a new runtime builder initialized with default configuration
255 /// values.
256 ///
257 /// Configuration methods can be chained on the return value.
258 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
259 Builder {
260 kind,
261
262 // I/O defaults to "off"
263 enable_io: false,
264 nevents: 1024,
265
266 // Time defaults to "off"
267 enable_time: false,
268
269 // The clock starts not-paused
270 start_paused: false,
271
272 // Read from environment variable first in multi-threaded mode.
273 // Default to lazy auto-detection (one thread per CPU core)
274 worker_threads: None,
275
276 max_blocking_threads: 512,
277
278 // Default thread name
279 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
280
281 // Do not set a stack size by default
282 thread_stack_size: None,
283
284 // No worker thread callbacks
285 after_start: None,
286 before_stop: None,
287 before_park: None,
288 after_unpark: None,
289
290 before_spawn: None,
291 after_termination: None,
292
293 #[cfg(tokio_unstable)]
294 before_poll: None,
295 #[cfg(tokio_unstable)]
296 after_poll: None,
297
298 keep_alive: None,
299
300 // Defaults for these values depend on the scheduler kind, so we get them
301 // as parameters.
302 global_queue_interval: None,
303 event_interval,
304
305 seed_generator: RngSeedGenerator::new(RngSeed::new()),
306
307 #[cfg(tokio_unstable)]
308 unhandled_panic: UnhandledPanic::Ignore,
309
310 metrics_poll_count_histogram_enable: false,
311
312 metrics_poll_count_histogram: HistogramBuilder::default(),
313
314 disable_lifo_slot: false,
315 }
316 }
317
318 /// Enables both I/O and time drivers.
319 ///
320 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
321 /// individually. If additional components are added to Tokio in the future,
322 /// `enable_all` will include these future components.
323 ///
324 /// # Examples
325 ///
326 /// ```
327 /// use tokio::runtime;
328 ///
329 /// let rt = runtime::Builder::new_multi_thread()
330 /// .enable_all()
331 /// .build()
332 /// .unwrap();
333 /// ```
334 pub fn enable_all(&mut self) -> &mut Self {
335 #[cfg(any(
336 feature = "net",
337 all(unix, feature = "process"),
338 all(unix, feature = "signal")
339 ))]
340 self.enable_io();
341 #[cfg(feature = "time")]
342 self.enable_time();
343
344 self
345 }
346
347 /// Sets the number of worker threads the `Runtime` will use.
348 ///
349 /// This can be any number above 0 though it is advised to keep this value
350 /// on the smaller side.
351 ///
352 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
353 ///
354 /// # Default
355 ///
356 /// The default value is the number of cores available to the system.
357 ///
358 /// When using the `current_thread` runtime this method has no effect.
359 ///
360 /// # Examples
361 ///
362 /// ## Multi threaded runtime with 4 threads
363 ///
364 /// ```
365 /// use tokio::runtime;
366 ///
367 /// // This will spawn a work-stealing runtime with 4 worker threads.
368 /// let rt = runtime::Builder::new_multi_thread()
369 /// .worker_threads(4)
370 /// .build()
371 /// .unwrap();
372 ///
373 /// rt.spawn(async move {});
374 /// ```
375 ///
376 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
377 ///
378 /// ```
379 /// use tokio::runtime;
380 ///
381 /// // Create a runtime that _must_ be driven from a call
382 /// // to `Runtime::block_on`.
383 /// let rt = runtime::Builder::new_current_thread()
384 /// .build()
385 /// .unwrap();
386 ///
387 /// // This will run the runtime and future on the current thread
388 /// rt.block_on(async move {});
389 /// ```
390 ///
391 /// # Panics
392 ///
393 /// This will panic if `val` is not larger than `0`.
394 #[track_caller]
395 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
396 assert!(val > 0, "Worker threads cannot be set to 0");
397 self.worker_threads = Some(val);
398 self
399 }
400
401 /// Specifies the limit for additional threads spawned by the Runtime.
402 ///
403 /// These threads are used for blocking operations like tasks spawned
404 /// through [`spawn_blocking`], this includes but is not limited to:
405 /// - [`fs`] operations
406 /// - dns resolution through [`ToSocketAddrs`]
407 /// - writing to [`Stdout`] or [`Stderr`]
408 /// - reading from [`Stdin`]
409 ///
410 /// Unlike the [`worker_threads`], they are not always active and will exit
411 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
412 ///
413 /// It's recommended to not set this limit too low in order to avoid hanging on operations
414 /// requiring [`spawn_blocking`].
415 ///
416 /// The default value is 512.
417 ///
418 /// # Queue Behavior
419 ///
420 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
421 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
422 /// method has not been reached, a new thread will be spawned. If no idle thread is available
423 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
424 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
425 /// it could potentially grow unbounded.
426 ///
427 /// # Panics
428 ///
429 /// This will panic if `val` is not larger than `0`.
430 ///
431 /// # Upgrading from 0.x
432 ///
433 /// In old versions `max_threads` limited both blocking and worker threads, but the
434 /// current `max_blocking_threads` does not include async worker threads in the count.
435 ///
436 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
437 /// [`fs`]: mod@crate::fs
438 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
439 /// [`Stdout`]: struct@crate::io::Stdout
440 /// [`Stdin`]: struct@crate::io::Stdin
441 /// [`Stderr`]: struct@crate::io::Stderr
442 /// [`worker_threads`]: Self::worker_threads
443 /// [`thread_keep_alive`]: Self::thread_keep_alive
444 #[track_caller]
445 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
446 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
447 assert!(val > 0, "Max blocking threads cannot be set to 0");
448 self.max_blocking_threads = val;
449 self
450 }
451
452 /// Sets name of threads spawned by the `Runtime`'s thread pool.
453 ///
454 /// The default name is "tokio-runtime-worker".
455 ///
456 /// # Examples
457 ///
458 /// ```
459 /// # use tokio::runtime;
460 ///
461 /// # pub fn main() {
462 /// let rt = runtime::Builder::new_multi_thread()
463 /// .thread_name("my-pool")
464 /// .build();
465 /// # }
466 /// ```
467 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
468 let val = val.into();
469 self.thread_name = std::sync::Arc::new(move || val.clone());
470 self
471 }
472
473 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
474 ///
475 /// The default name fn is `|| "tokio-runtime-worker".into()`.
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// # use tokio::runtime;
481 /// # use std::sync::atomic::{AtomicUsize, Ordering};
482 /// # pub fn main() {
483 /// let rt = runtime::Builder::new_multi_thread()
484 /// .thread_name_fn(|| {
485 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
486 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
487 /// format!("my-pool-{}", id)
488 /// })
489 /// .build();
490 /// # }
491 /// ```
492 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
493 where
494 F: Fn() -> String + Send + Sync + 'static,
495 {
496 self.thread_name = std::sync::Arc::new(f);
497 self
498 }
499
500 /// Sets the stack size (in bytes) for worker threads.
501 ///
502 /// The actual stack size may be greater than this value if the platform
503 /// specifies minimal stack size.
504 ///
505 /// The default stack size for spawned threads is 2 MiB, though this
506 /// particular stack size is subject to change in the future.
507 ///
508 /// # Examples
509 ///
510 /// ```
511 /// # use tokio::runtime;
512 ///
513 /// # pub fn main() {
514 /// let rt = runtime::Builder::new_multi_thread()
515 /// .thread_stack_size(32 * 1024)
516 /// .build();
517 /// # }
518 /// ```
519 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
520 self.thread_stack_size = Some(val);
521 self
522 }
523
524 /// Executes function `f` after each thread is started but before it starts
525 /// doing work.
526 ///
527 /// This is intended for bookkeeping and monitoring use cases.
528 ///
529 /// # Examples
530 ///
531 /// ```
532 /// # use tokio::runtime;
533 /// # pub fn main() {
534 /// let runtime = runtime::Builder::new_multi_thread()
535 /// .on_thread_start(|| {
536 /// println!("thread started");
537 /// })
538 /// .build();
539 /// # }
540 /// ```
541 #[cfg(not(loom))]
542 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
543 where
544 F: Fn() + Send + Sync + 'static,
545 {
546 self.after_start = Some(std::sync::Arc::new(f));
547 self
548 }
549
550 /// Executes function `f` before each thread stops.
551 ///
552 /// This is intended for bookkeeping and monitoring use cases.
553 ///
554 /// # Examples
555 ///
556 /// ```
557 /// # use tokio::runtime;
558 /// # pub fn main() {
559 /// let runtime = runtime::Builder::new_multi_thread()
560 /// .on_thread_stop(|| {
561 /// println!("thread stopping");
562 /// })
563 /// .build();
564 /// # }
565 /// ```
566 #[cfg(not(loom))]
567 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
568 where
569 F: Fn() + Send + Sync + 'static,
570 {
571 self.before_stop = Some(std::sync::Arc::new(f));
572 self
573 }
574
575 /// Executes function `f` just before a thread is parked (goes idle).
576 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
577 /// can be called, and may result in this thread being unparked immediately.
578 ///
579 /// This can be used to start work only when the executor is idle, or for bookkeeping
580 /// and monitoring purposes.
581 ///
582 /// Note: There can only be one park callback for a runtime; calling this function
583 /// more than once replaces the last callback defined, rather than adding to it.
584 ///
585 /// # Examples
586 ///
587 /// ## Multithreaded executor
588 /// ```
589 /// # use std::sync::Arc;
590 /// # use std::sync::atomic::{AtomicBool, Ordering};
591 /// # use tokio::runtime;
592 /// # use tokio::sync::Barrier;
593 /// # pub fn main() {
594 /// let once = AtomicBool::new(true);
595 /// let barrier = Arc::new(Barrier::new(2));
596 ///
597 /// let runtime = runtime::Builder::new_multi_thread()
598 /// .worker_threads(1)
599 /// .on_thread_park({
600 /// let barrier = barrier.clone();
601 /// move || {
602 /// let barrier = barrier.clone();
603 /// if once.swap(false, Ordering::Relaxed) {
604 /// tokio::spawn(async move { barrier.wait().await; });
605 /// }
606 /// }
607 /// })
608 /// .build()
609 /// .unwrap();
610 ///
611 /// runtime.block_on(async {
612 /// barrier.wait().await;
613 /// })
614 /// # }
615 /// ```
616 /// ## Current thread executor
617 /// ```
618 /// # use std::sync::Arc;
619 /// # use std::sync::atomic::{AtomicBool, Ordering};
620 /// # use tokio::runtime;
621 /// # use tokio::sync::Barrier;
622 /// # pub fn main() {
623 /// let once = AtomicBool::new(true);
624 /// let barrier = Arc::new(Barrier::new(2));
625 ///
626 /// let runtime = runtime::Builder::new_current_thread()
627 /// .on_thread_park({
628 /// let barrier = barrier.clone();
629 /// move || {
630 /// let barrier = barrier.clone();
631 /// if once.swap(false, Ordering::Relaxed) {
632 /// tokio::spawn(async move { barrier.wait().await; });
633 /// }
634 /// }
635 /// })
636 /// .build()
637 /// .unwrap();
638 ///
639 /// runtime.block_on(async {
640 /// barrier.wait().await;
641 /// })
642 /// # }
643 /// ```
644 #[cfg(not(loom))]
645 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
646 where
647 F: Fn() + Send + Sync + 'static,
648 {
649 self.before_park = Some(std::sync::Arc::new(f));
650 self
651 }
652
653 /// Executes function `f` just after a thread unparks (starts executing tasks).
654 ///
655 /// This is intended for bookkeeping and monitoring use cases; note that work
656 /// in this callback will increase latencies when the application has allowed one or
657 /// more runtime threads to go idle.
658 ///
659 /// Note: There can only be one unpark callback for a runtime; calling this function
660 /// more than once replaces the last callback defined, rather than adding to it.
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// # use tokio::runtime;
666 /// # pub fn main() {
667 /// let runtime = runtime::Builder::new_multi_thread()
668 /// .on_thread_unpark(|| {
669 /// println!("thread unparking");
670 /// })
671 /// .build();
672 ///
673 /// runtime.unwrap().block_on(async {
674 /// tokio::task::yield_now().await;
675 /// println!("Hello from Tokio!");
676 /// })
677 /// # }
678 /// ```
679 #[cfg(not(loom))]
680 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
681 where
682 F: Fn() + Send + Sync + 'static,
683 {
684 self.after_unpark = Some(std::sync::Arc::new(f));
685 self
686 }
687
688 /// Executes function `f` just before a task is spawned.
689 ///
690 /// `f` is called within the Tokio context, so functions like
691 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
692 /// invoked immediately.
693 ///
694 /// This can be used for bookkeeping or monitoring purposes.
695 ///
696 /// Note: There can only be one spawn callback for a runtime; calling this function more
697 /// than once replaces the last callback defined, rather than adding to it.
698 ///
699 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
700 ///
701 /// **Note**: This is an [unstable API][unstable]. The public API of this type
702 /// may break in 1.x releases. See [the documentation on unstable
703 /// features][unstable] for details.
704 ///
705 /// [unstable]: crate#unstable-features
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// # use tokio::runtime;
711 /// # pub fn main() {
712 /// let runtime = runtime::Builder::new_current_thread()
713 /// .on_task_spawn(|_| {
714 /// println!("spawning task");
715 /// })
716 /// .build()
717 /// .unwrap();
718 ///
719 /// runtime.block_on(async {
720 /// tokio::task::spawn(std::future::ready(()));
721 ///
722 /// for _ in 0..64 {
723 /// tokio::task::yield_now().await;
724 /// }
725 /// })
726 /// # }
727 /// ```
728 #[cfg(all(not(loom), tokio_unstable))]
729 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
730 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
731 where
732 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
733 {
734 self.before_spawn = Some(std::sync::Arc::new(f));
735 self
736 }
737
738 /// Executes function `f` just before a task is polled
739 ///
740 /// `f` is called within the Tokio context, so functions like
741 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
742 /// invoked immediately.
743 ///
744 /// **Note**: This is an [unstable API][unstable]. The public API of this type
745 /// may break in 1.x releases. See [the documentation on unstable
746 /// features][unstable] for details.
747 ///
748 /// [unstable]: crate#unstable-features
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// # use std::sync::{atomic::AtomicUsize, Arc};
754 /// # use tokio::task::yield_now;
755 /// # pub fn main() {
756 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
757 /// let poll_start = poll_start_counter.clone();
758 /// let rt = tokio::runtime::Builder::new_multi_thread()
759 /// .enable_all()
760 /// .on_before_task_poll(move |meta| {
761 /// println!("task {} is about to be polled", meta.id())
762 /// })
763 /// .build()
764 /// .unwrap();
765 /// let task = rt.spawn(async {
766 /// yield_now().await;
767 /// });
768 /// let _ = rt.block_on(task);
769 ///
770 /// # }
771 /// ```
772 #[cfg(tokio_unstable)]
773 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
774 where
775 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
776 {
777 self.before_poll = Some(std::sync::Arc::new(f));
778 self
779 }
780
781 /// Executes function `f` just after a task is polled
782 ///
783 /// `f` is called within the Tokio context, so functions like
784 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
785 /// invoked immediately.
786 ///
787 /// **Note**: This is an [unstable API][unstable]. The public API of this type
788 /// may break in 1.x releases. See [the documentation on unstable
789 /// features][unstable] for details.
790 ///
791 /// [unstable]: crate#unstable-features
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// # use std::sync::{atomic::AtomicUsize, Arc};
797 /// # use tokio::task::yield_now;
798 /// # pub fn main() {
799 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
800 /// let poll_stop = poll_stop_counter.clone();
801 /// let rt = tokio::runtime::Builder::new_multi_thread()
802 /// .enable_all()
803 /// .on_after_task_poll(move |meta| {
804 /// println!("task {} completed polling", meta.id());
805 /// })
806 /// .build()
807 /// .unwrap();
808 /// let task = rt.spawn(async {
809 /// yield_now().await;
810 /// });
811 /// let _ = rt.block_on(task);
812 ///
813 /// # }
814 /// ```
815 #[cfg(tokio_unstable)]
816 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
817 where
818 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
819 {
820 self.after_poll = Some(std::sync::Arc::new(f));
821 self
822 }
823
824 /// Executes function `f` just after a task is terminated.
825 ///
826 /// `f` is called within the Tokio context, so functions like
827 /// [`tokio::spawn`](crate::spawn) can be called.
828 ///
829 /// This can be used for bookkeeping or monitoring purposes.
830 ///
831 /// Note: There can only be one task termination callback for a runtime; calling this
832 /// function more than once replaces the last callback defined, rather than adding to it.
833 ///
834 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
835 ///
836 /// **Note**: This is an [unstable API][unstable]. The public API of this type
837 /// may break in 1.x releases. See [the documentation on unstable
838 /// features][unstable] for details.
839 ///
840 /// [unstable]: crate#unstable-features
841 ///
842 /// # Examples
843 ///
844 /// ```
845 /// # use tokio::runtime;
846 /// # pub fn main() {
847 /// let runtime = runtime::Builder::new_current_thread()
848 /// .on_task_terminate(|_| {
849 /// println!("killing task");
850 /// })
851 /// .build()
852 /// .unwrap();
853 ///
854 /// runtime.block_on(async {
855 /// tokio::task::spawn(std::future::ready(()));
856 ///
857 /// for _ in 0..64 {
858 /// tokio::task::yield_now().await;
859 /// }
860 /// })
861 /// # }
862 /// ```
863 #[cfg(all(not(loom), tokio_unstable))]
864 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
865 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
866 where
867 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
868 {
869 self.after_termination = Some(std::sync::Arc::new(f));
870 self
871 }
872
873 /// Creates the configured `Runtime`.
874 ///
875 /// The returned `Runtime` instance is ready to spawn tasks.
876 ///
877 /// # Examples
878 ///
879 /// ```
880 /// use tokio::runtime::Builder;
881 ///
882 /// let rt = Builder::new_multi_thread().build().unwrap();
883 ///
884 /// rt.block_on(async {
885 /// println!("Hello from the Tokio runtime");
886 /// });
887 /// ```
888 pub fn build(&mut self) -> io::Result<Runtime> {
889 match &self.kind {
890 Kind::CurrentThread => self.build_current_thread_runtime(),
891 #[cfg(feature = "rt-multi-thread")]
892 Kind::MultiThread => self.build_threaded_runtime(),
893 }
894 }
895
896 /// Creates the configured `LocalRuntime`.
897 ///
898 /// The returned `LocalRuntime` instance is ready to spawn tasks.
899 ///
900 /// # Panics
901 /// This will panic if `current_thread` is not the selected runtime flavor.
902 /// All other runtime flavors are unsupported by [`LocalRuntime`].
903 ///
904 /// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
905 ///
906 /// # Examples
907 ///
908 /// ```
909 /// use tokio::runtime::Builder;
910 ///
911 /// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
912 ///
913 /// rt.block_on(async {
914 /// println!("Hello from the Tokio runtime");
915 /// });
916 /// ```
917 #[allow(unused_variables, unreachable_patterns)]
918 #[cfg(tokio_unstable)]
919 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
920 pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
921 match &self.kind {
922 Kind::CurrentThread => self.build_current_thread_local_runtime(),
923 _ => panic!("Only current_thread is supported when building a local runtime"),
924 }
925 }
926
927 fn get_cfg(&self) -> driver::Cfg {
928 driver::Cfg {
929 enable_pause_time: match self.kind {
930 Kind::CurrentThread => true,
931 #[cfg(feature = "rt-multi-thread")]
932 Kind::MultiThread => false,
933 },
934 enable_io: self.enable_io,
935 enable_time: self.enable_time,
936 start_paused: self.start_paused,
937 nevents: self.nevents,
938 }
939 }
940
941 /// Sets a custom timeout for a thread in the blocking pool.
942 ///
943 /// By default, the timeout for a thread is set to 10 seconds. This can
944 /// be overridden using `.thread_keep_alive()`.
945 ///
946 /// # Example
947 ///
948 /// ```
949 /// # use tokio::runtime;
950 /// # use std::time::Duration;
951 /// # pub fn main() {
952 /// let rt = runtime::Builder::new_multi_thread()
953 /// .thread_keep_alive(Duration::from_millis(100))
954 /// .build();
955 /// # }
956 /// ```
957 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
958 self.keep_alive = Some(duration);
959 self
960 }
961
962 /// Sets the number of scheduler ticks after which the scheduler will poll the global
963 /// task queue.
964 ///
965 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
966 ///
967 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
968 /// [the module documentation] for the default behavior of the multi-thread scheduler.
969 ///
970 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
971 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
972 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
973 /// getting started on new work, especially if tasks frequently yield rather than complete
974 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
975 /// is a good choice when most tasks quickly complete polling.
976 ///
977 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
978 ///
979 /// # Panics
980 ///
981 /// This function will panic if 0 is passed as an argument.
982 ///
983 /// # Examples
984 ///
985 /// ```
986 /// # use tokio::runtime;
987 /// # pub fn main() {
988 /// let rt = runtime::Builder::new_multi_thread()
989 /// .global_queue_interval(31)
990 /// .build();
991 /// # }
992 /// ```
993 #[track_caller]
994 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
995 assert!(val > 0, "global_queue_interval must be greater than 0");
996 self.global_queue_interval = Some(val);
997 self
998 }
999
1000 /// Sets the number of scheduler ticks after which the scheduler will poll for
1001 /// external events (timers, I/O, and so on).
1002 ///
1003 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1004 ///
1005 /// By default, the event interval is `61` for all scheduler types.
1006 ///
1007 /// Setting the event interval determines the effective "priority" of delivering
1008 /// these external events (which may wake up additional tasks), compared to
1009 /// executing tasks that are currently ready to run. A smaller value is useful
1010 /// when tasks frequently spend a long time in polling, or frequently yield,
1011 /// which can result in overly long delays picking up I/O events. Conversely,
1012 /// picking up new events requires extra synchronization and syscall overhead,
1013 /// so if tasks generally complete their polling quickly, a higher event interval
1014 /// will minimize that overhead while still keeping the scheduler responsive to
1015 /// events.
1016 ///
1017 /// # Examples
1018 ///
1019 /// ```
1020 /// # use tokio::runtime;
1021 /// # pub fn main() {
1022 /// let rt = runtime::Builder::new_multi_thread()
1023 /// .event_interval(31)
1024 /// .build();
1025 /// # }
1026 /// ```
1027 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1028 self.event_interval = val;
1029 self
1030 }
1031
1032 cfg_unstable! {
1033 /// Configure how the runtime responds to an unhandled panic on a
1034 /// spawned task.
1035 ///
1036 /// By default, an unhandled panic (i.e. a panic not caught by
1037 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1038 /// execution. The panic's error value is forwarded to the task's
1039 /// [`JoinHandle`] and all other spawned tasks continue running.
1040 ///
1041 /// The `unhandled_panic` option enables configuring this behavior.
1042 ///
1043 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1044 /// spawned tasks have no impact on the runtime's execution.
1045 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1046 /// shutdown immediately when a spawned task panics even if that
1047 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1048 /// will immediately terminate and further calls to
1049 /// [`Runtime::block_on`] will panic.
1050 ///
1051 /// # Panics
1052 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1053 /// on a runtime other than the current thread runtime.
1054 ///
1055 /// # Unstable
1056 ///
1057 /// This option is currently unstable and its implementation is
1058 /// incomplete. The API may change or be removed in the future. See
1059 /// issue [tokio-rs/tokio#4516] for more details.
1060 ///
1061 /// # Examples
1062 ///
1063 /// The following demonstrates a runtime configured to shutdown on
1064 /// panic. The first spawned task panics and results in the runtime
1065 /// shutting down. The second spawned task never has a chance to
1066 /// execute. The call to `block_on` will panic due to the runtime being
1067 /// forcibly shutdown.
1068 ///
1069 /// ```should_panic
1070 /// use tokio::runtime::{self, UnhandledPanic};
1071 ///
1072 /// # pub fn main() {
1073 /// let rt = runtime::Builder::new_current_thread()
1074 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1075 /// .build()
1076 /// .unwrap();
1077 ///
1078 /// rt.spawn(async { panic!("boom"); });
1079 /// rt.spawn(async {
1080 /// // This task never completes.
1081 /// });
1082 ///
1083 /// rt.block_on(async {
1084 /// // Do some work
1085 /// # loop { tokio::task::yield_now().await; }
1086 /// })
1087 /// # }
1088 /// ```
1089 ///
1090 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1091 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1092 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1093 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1094 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1095 }
1096
1097 self.unhandled_panic = behavior;
1098 self
1099 }
1100
1101 /// Disables the LIFO task scheduler heuristic.
1102 ///
1103 /// The multi-threaded scheduler includes a heuristic for optimizing
1104 /// message-passing patterns. This heuristic results in the **last**
1105 /// scheduled task being polled first.
1106 ///
1107 /// To implement this heuristic, each worker thread has a slot which
1108 /// holds the task that should be polled next. However, this slot cannot
1109 /// be stolen by other worker threads, which can result in lower total
1110 /// throughput when tasks tend to have longer poll times.
1111 ///
1112 /// This configuration option will disable this heuristic resulting in
1113 /// all scheduled tasks being pushed into the worker-local queue, which
1114 /// is stealable.
1115 ///
1116 /// Consider trying this option when the task "scheduled" time is high
1117 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1118 /// collect this data.
1119 ///
1120 /// # Unstable
1121 ///
1122 /// This configuration option is considered a workaround for the LIFO
1123 /// slot not being stealable. When the slot becomes stealable, we will
1124 /// revisit whether or not this option is necessary. See
1125 /// issue [tokio-rs/tokio#4941].
1126 ///
1127 /// # Examples
1128 ///
1129 /// ```
1130 /// use tokio::runtime;
1131 ///
1132 /// let rt = runtime::Builder::new_multi_thread()
1133 /// .disable_lifo_slot()
1134 /// .build()
1135 /// .unwrap();
1136 /// ```
1137 ///
1138 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1139 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1140 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1141 self.disable_lifo_slot = true;
1142 self
1143 }
1144
1145 /// Specifies the random number generation seed to use within all
1146 /// threads associated with the runtime being built.
1147 ///
1148 /// This option is intended to make certain parts of the runtime
1149 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1150 /// [`tokio::select!`] it will ensure that the order that branches are
1151 /// polled is deterministic.
1152 ///
1153 /// In addition to the code specifying `rng_seed` and interacting with
1154 /// the runtime, the internals of Tokio and the Rust compiler may affect
1155 /// the sequences of random numbers. In order to ensure repeatable
1156 /// results, the version of Tokio, the versions of all other
1157 /// dependencies that interact with Tokio, and the Rust compiler version
1158 /// should also all remain constant.
1159 ///
1160 /// # Examples
1161 ///
1162 /// ```
1163 /// # use tokio::runtime::{self, RngSeed};
1164 /// # pub fn main() {
1165 /// let seed = RngSeed::from_bytes(b"place your seed here");
1166 /// let rt = runtime::Builder::new_current_thread()
1167 /// .rng_seed(seed)
1168 /// .build();
1169 /// # }
1170 /// ```
1171 ///
1172 /// [`tokio::select!`]: crate::select
1173 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1174 self.seed_generator = RngSeedGenerator::new(seed);
1175 self
1176 }
1177 }
1178
1179 cfg_unstable_metrics! {
1180 /// Enables tracking the distribution of task poll times.
1181 ///
1182 /// Task poll times are not instrumented by default as doing so requires
1183 /// calling [`Instant::now()`] twice per task poll, which could add
1184 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1185 /// metrics data.
1186 ///
1187 /// The histogram uses fixed bucket sizes. In other words, the histogram
1188 /// buckets are not dynamic based on input values. Use the
1189 /// `metrics_poll_time_histogram` builder methods to configure the
1190 /// histogram details.
1191 ///
1192 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1193 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1194 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1195 /// to select [`LogHistogram`] instead.
1196 ///
1197 /// # Examples
1198 ///
1199 /// ```
1200 /// use tokio::runtime;
1201 ///
1202 /// let rt = runtime::Builder::new_multi_thread()
1203 /// .enable_metrics_poll_time_histogram()
1204 /// .build()
1205 /// .unwrap();
1206 /// # // Test default values here
1207 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1208 /// # let m = rt.handle().metrics();
1209 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1210 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1211 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1212 /// ```
1213 ///
1214 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1215 /// [`Instant::now()`]: std::time::Instant::now
1216 /// [`LogHistogram`]: crate::runtime::LogHistogram
1217 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1218 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1219 self.metrics_poll_count_histogram_enable = true;
1220 self
1221 }
1222
1223 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1224 ///
1225 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1226 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1227 #[doc(hidden)]
1228 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1229 self.enable_metrics_poll_time_histogram()
1230 }
1231
1232 /// Sets the histogram scale for tracking the distribution of task poll
1233 /// times.
1234 ///
1235 /// Tracking the distribution of task poll times can be done using a
1236 /// linear or log scale. When using linear scale, each histogram bucket
1237 /// will represent the same range of poll times. When using log scale,
1238 /// each histogram bucket will cover a range twice as big as the
1239 /// previous bucket.
1240 ///
1241 /// **Default:** linear scale.
1242 ///
1243 /// # Examples
1244 ///
1245 /// ```
1246 /// use tokio::runtime::{self, HistogramScale};
1247 ///
1248 /// # #[allow(deprecated)]
1249 /// let rt = runtime::Builder::new_multi_thread()
1250 /// .enable_metrics_poll_time_histogram()
1251 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1252 /// .build()
1253 /// .unwrap();
1254 /// ```
1255 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1256 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1257 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1258 self
1259 }
1260
1261 /// Configure the histogram for tracking poll times
1262 ///
1263 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1264 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1265 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1266 ///
1267 /// # Examples
1268 /// Configure a [`LogHistogram`] with [default configuration]:
1269 /// ```
1270 /// use tokio::runtime;
1271 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1272 ///
1273 /// let rt = runtime::Builder::new_multi_thread()
1274 /// .enable_metrics_poll_time_histogram()
1275 /// .metrics_poll_time_histogram_configuration(
1276 /// HistogramConfiguration::log(LogHistogram::default())
1277 /// )
1278 /// .build()
1279 /// .unwrap();
1280 /// ```
1281 ///
1282 /// Configure a linear histogram with 100 buckets, each 10μs wide
1283 /// ```
1284 /// use tokio::runtime;
1285 /// use std::time::Duration;
1286 /// use tokio::runtime::HistogramConfiguration;
1287 ///
1288 /// let rt = runtime::Builder::new_multi_thread()
1289 /// .enable_metrics_poll_time_histogram()
1290 /// .metrics_poll_time_histogram_configuration(
1291 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1292 /// )
1293 /// .build()
1294 /// .unwrap();
1295 /// ```
1296 ///
1297 /// Configure a [`LogHistogram`] with the following settings:
1298 /// - Measure times from 100ns to 120s
1299 /// - Max error of 0.1
1300 /// - No more than 1024 buckets
1301 /// ```
1302 /// use std::time::Duration;
1303 /// use tokio::runtime;
1304 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1305 ///
1306 /// let rt = runtime::Builder::new_multi_thread()
1307 /// .enable_metrics_poll_time_histogram()
1308 /// .metrics_poll_time_histogram_configuration(
1309 /// HistogramConfiguration::log(LogHistogram::builder()
1310 /// .max_value(Duration::from_secs(120))
1311 /// .min_value(Duration::from_nanos(100))
1312 /// .max_error(0.1)
1313 /// .max_buckets(1024)
1314 /// .expect("configuration uses 488 buckets")
1315 /// )
1316 /// )
1317 /// .build()
1318 /// .unwrap();
1319 /// ```
1320 ///
1321 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1322 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1323 /// where each bucket is twice the size of the previous bucket.
1324 /// ```rust
1325 /// use std::time::Duration;
1326 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1327 /// let rt = tokio::runtime::Builder::new_current_thread()
1328 /// .enable_all()
1329 /// .enable_metrics_poll_time_histogram()
1330 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1331 /// LogHistogram::builder()
1332 /// .min_value(Duration::from_micros(20))
1333 /// .max_value(Duration::from_millis(4))
1334 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1335 /// .precision_exact(0)
1336 /// .max_buckets(10)
1337 /// .unwrap(),
1338 /// ))
1339 /// .build()
1340 /// .unwrap();
1341 /// ```
1342 ///
1343 /// [`LogHistogram`]: crate::runtime::LogHistogram
1344 /// [default configuration]: crate::runtime::LogHistogramBuilder
1345 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1346 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1347 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1348 self
1349 }
1350
1351 /// Sets the histogram resolution for tracking the distribution of task
1352 /// poll times.
1353 ///
1354 /// The resolution is the histogram's first bucket's range. When using a
1355 /// linear histogram scale, each bucket will cover the same range. When
1356 /// using a log scale, each bucket will cover a range twice as big as
1357 /// the previous bucket. In the log case, the resolution represents the
1358 /// smallest bucket range.
1359 ///
1360 /// Note that, when using log scale, the resolution is rounded up to the
1361 /// nearest power of 2 in nanoseconds.
1362 ///
1363 /// **Default:** 100 microseconds.
1364 ///
1365 /// # Examples
1366 ///
1367 /// ```
1368 /// use tokio::runtime;
1369 /// use std::time::Duration;
1370 ///
1371 /// # #[allow(deprecated)]
1372 /// let rt = runtime::Builder::new_multi_thread()
1373 /// .enable_metrics_poll_time_histogram()
1374 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1375 /// .build()
1376 /// .unwrap();
1377 /// ```
1378 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1379 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1380 assert!(resolution > Duration::from_secs(0));
1381 // Sanity check the argument and also make the cast below safe.
1382 assert!(resolution <= Duration::from_secs(1));
1383
1384 let resolution = resolution.as_nanos() as u64;
1385
1386 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1387 self
1388 }
1389
1390 /// Sets the number of buckets for the histogram tracking the
1391 /// distribution of task poll times.
1392 ///
1393 /// The last bucket tracks all greater values that fall out of other
1394 /// ranges. So, configuring the histogram using a linear scale,
1395 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1396 /// polls that take more than 450ms to complete.
1397 ///
1398 /// **Default:** 10
1399 ///
1400 /// # Examples
1401 ///
1402 /// ```
1403 /// use tokio::runtime;
1404 ///
1405 /// # #[allow(deprecated)]
1406 /// let rt = runtime::Builder::new_multi_thread()
1407 /// .enable_metrics_poll_time_histogram()
1408 /// .metrics_poll_count_histogram_buckets(15)
1409 /// .build()
1410 /// .unwrap();
1411 /// ```
1412 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1413 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1414 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1415 self
1416 }
1417 }
1418
1419 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1420 use crate::runtime::runtime::Scheduler;
1421
1422 let (scheduler, handle, blocking_pool) =
1423 self.build_current_thread_runtime_components(None)?;
1424
1425 Ok(Runtime::from_parts(
1426 Scheduler::CurrentThread(scheduler),
1427 handle,
1428 blocking_pool,
1429 ))
1430 }
1431
1432 #[cfg(tokio_unstable)]
1433 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1434 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1435
1436 let tid = std::thread::current().id();
1437
1438 let (scheduler, handle, blocking_pool) =
1439 self.build_current_thread_runtime_components(Some(tid))?;
1440
1441 Ok(LocalRuntime::from_parts(
1442 LocalRuntimeScheduler::CurrentThread(scheduler),
1443 handle,
1444 blocking_pool,
1445 ))
1446 }
1447
1448 fn build_current_thread_runtime_components(
1449 &mut self,
1450 local_tid: Option<ThreadId>,
1451 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1452 use crate::runtime::scheduler;
1453 use crate::runtime::Config;
1454
1455 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1456
1457 // Blocking pool
1458 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1459 let blocking_spawner = blocking_pool.spawner().clone();
1460
1461 // Generate a rng seed for this runtime.
1462 let seed_generator_1 = self.seed_generator.next_generator();
1463 let seed_generator_2 = self.seed_generator.next_generator();
1464
1465 // And now put a single-threaded scheduler on top of the timer. When
1466 // there are no futures ready to do something, it'll let the timer or
1467 // the reactor to generate some new stimuli for the futures to continue
1468 // in their life.
1469 let (scheduler, handle) = CurrentThread::new(
1470 driver,
1471 driver_handle,
1472 blocking_spawner,
1473 seed_generator_2,
1474 Config {
1475 before_park: self.before_park.clone(),
1476 after_unpark: self.after_unpark.clone(),
1477 before_spawn: self.before_spawn.clone(),
1478 #[cfg(tokio_unstable)]
1479 before_poll: self.before_poll.clone(),
1480 #[cfg(tokio_unstable)]
1481 after_poll: self.after_poll.clone(),
1482 after_termination: self.after_termination.clone(),
1483 global_queue_interval: self.global_queue_interval,
1484 event_interval: self.event_interval,
1485 #[cfg(tokio_unstable)]
1486 unhandled_panic: self.unhandled_panic.clone(),
1487 disable_lifo_slot: self.disable_lifo_slot,
1488 seed_generator: seed_generator_1,
1489 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1490 },
1491 local_tid,
1492 );
1493
1494 let handle = Handle {
1495 inner: scheduler::Handle::CurrentThread(handle),
1496 };
1497
1498 Ok((scheduler, handle, blocking_pool))
1499 }
1500
1501 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1502 if self.metrics_poll_count_histogram_enable {
1503 Some(self.metrics_poll_count_histogram.clone())
1504 } else {
1505 None
1506 }
1507 }
1508}
1509
1510cfg_io_driver! {
1511 impl Builder {
1512 /// Enables the I/O driver.
1513 ///
1514 /// Doing this enables using net, process, signal, and some I/O types on
1515 /// the runtime.
1516 ///
1517 /// # Examples
1518 ///
1519 /// ```
1520 /// use tokio::runtime;
1521 ///
1522 /// let rt = runtime::Builder::new_multi_thread()
1523 /// .enable_io()
1524 /// .build()
1525 /// .unwrap();
1526 /// ```
1527 pub fn enable_io(&mut self) -> &mut Self {
1528 self.enable_io = true;
1529 self
1530 }
1531
1532 /// Enables the I/O driver and configures the max number of events to be
1533 /// processed per tick.
1534 ///
1535 /// # Examples
1536 ///
1537 /// ```
1538 /// use tokio::runtime;
1539 ///
1540 /// let rt = runtime::Builder::new_current_thread()
1541 /// .enable_io()
1542 /// .max_io_events_per_tick(1024)
1543 /// .build()
1544 /// .unwrap();
1545 /// ```
1546 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1547 self.nevents = capacity;
1548 self
1549 }
1550 }
1551}
1552
1553cfg_time! {
1554 impl Builder {
1555 /// Enables the time driver.
1556 ///
1557 /// Doing this enables using `tokio::time` on the runtime.
1558 ///
1559 /// # Examples
1560 ///
1561 /// ```
1562 /// use tokio::runtime;
1563 ///
1564 /// let rt = runtime::Builder::new_multi_thread()
1565 /// .enable_time()
1566 /// .build()
1567 /// .unwrap();
1568 /// ```
1569 pub fn enable_time(&mut self) -> &mut Self {
1570 self.enable_time = true;
1571 self
1572 }
1573 }
1574}
1575
1576cfg_test_util! {
1577 impl Builder {
1578 /// Controls if the runtime's clock starts paused or advancing.
1579 ///
1580 /// Pausing time requires the current-thread runtime; construction of
1581 /// the runtime will panic otherwise.
1582 ///
1583 /// # Examples
1584 ///
1585 /// ```
1586 /// use tokio::runtime;
1587 ///
1588 /// let rt = runtime::Builder::new_current_thread()
1589 /// .enable_time()
1590 /// .start_paused(true)
1591 /// .build()
1592 /// .unwrap();
1593 /// ```
1594 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1595 self.start_paused = start_paused;
1596 self
1597 }
1598 }
1599}
1600
1601cfg_rt_multi_thread! {
1602 impl Builder {
1603 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1604 use crate::loom::sys::num_cpus;
1605 use crate::runtime::{Config, runtime::Scheduler};
1606 use crate::runtime::scheduler::{self, MultiThread};
1607
1608 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1609
1610 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1611
1612 // Create the blocking pool
1613 let blocking_pool =
1614 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1615 let blocking_spawner = blocking_pool.spawner().clone();
1616
1617 // Generate a rng seed for this runtime.
1618 let seed_generator_1 = self.seed_generator.next_generator();
1619 let seed_generator_2 = self.seed_generator.next_generator();
1620
1621 let (scheduler, handle, launch) = MultiThread::new(
1622 worker_threads,
1623 driver,
1624 driver_handle,
1625 blocking_spawner,
1626 seed_generator_2,
1627 Config {
1628 before_park: self.before_park.clone(),
1629 after_unpark: self.after_unpark.clone(),
1630 before_spawn: self.before_spawn.clone(),
1631 #[cfg(tokio_unstable)]
1632 before_poll: self.before_poll.clone(),
1633 #[cfg(tokio_unstable)]
1634 after_poll: self.after_poll.clone(),
1635 after_termination: self.after_termination.clone(),
1636 global_queue_interval: self.global_queue_interval,
1637 event_interval: self.event_interval,
1638 #[cfg(tokio_unstable)]
1639 unhandled_panic: self.unhandled_panic.clone(),
1640 disable_lifo_slot: self.disable_lifo_slot,
1641 seed_generator: seed_generator_1,
1642 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1643 },
1644 );
1645
1646 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1647
1648 // Spawn the thread pool workers
1649 let _enter = handle.enter();
1650 launch.launch();
1651
1652 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1653 }
1654 }
1655}
1656
1657impl fmt::Debug for Builder {
1658 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1659 fmt.debug_struct("Builder")
1660 .field("worker_threads", &self.worker_threads)
1661 .field("max_blocking_threads", &self.max_blocking_threads)
1662 .field(
1663 "thread_name",
1664 &"<dyn Fn() -> String + Send + Sync + 'static>",
1665 )
1666 .field("thread_stack_size", &self.thread_stack_size)
1667 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1668 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1669 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1670 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1671 .finish()
1672 }
1673}