tokio waker instrumentation
by Friday, 21 June 2024
onThis is going to be a fairly in-depth post on understanding how we can follow task dependency edges using the tracing instrumentation available in the Tokio async runtime. Specifically related to determining which task (if any) woke some other task. It will cover some of the same ground as a post from last year tracing tokio tasks, but focus more on the waker events. If you haven't read that post already, it may be worth doing so before continuing.
In async Rust, when the future driving a task returns Poll::Pending
, it won't be polled again until it gets woken by a waker. If you'd like to understand more about how pending futures get woken, you can read how I finally understood async/await in Rust (part 2: how does a pending future get woken?).
wake-up code
Let's start with some Rust code which we're going to analyse via the tracing instrumentation in tokio. Don't worry if this looks like a lot of code, we're going to spend the rest of the post going through it step by step.
#[tokio::main]
async fn main() {
use tracing_subscriber::prelude::*;
let layer = ari_subscriber::Layer::new();
tracing_subscriber::registry().with(layer).init();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let receiver_jh = spawn_named("receiver", async move {
tracing::debug!("self-wake: wake from same task (async)");
self_wake().await;
while rx.recv().await.is_some() {
tracing::debug!("received message");
}
tracing::debug!("received None");
});
tokio::time::sleep(Duration::from_millis(10)).await;
let task_tx = tx.clone();
spawn_named("sender-task", async move {
tracing::debug!("wake from another task (async)");
task_tx.send(()).await.unwrap();
})
.await
.unwrap();
let no_runtime_tx = tx.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(10));
tracing::debug!("wake from another thread (non-async)");
no_runtime_tx.try_send(()).unwrap();
})
.join()
.unwrap();
drop(tx);
receiver_jh.await.unwrap();
}
We're using a slightly modified version of my ari-subscriber
crate, which adds colour coding to the spans and events generated by tokio's traces, to view the output. I introduced this crate in debugging tokio instrumentation. I'm hoping to integrate some of these changes behind builder options.
One of the main changes is to show each parent span and event on their own lines. This is much more appropriate for viewing in narrow spaces - like the code box on this web-site. The other change is that only spawn
spans - those representing tasks - and waker
events are included, the rest are filtered out. Here's the full output:
2024-06-20T15:10:05.495210Z TRACE
⤷ runtime.spawn[2]{kind=task, task.name=dummy, task.id=2} new
2024-06-20T15:10:05.495363Z TRACE
⤷ runtime.spawn[2]{kind=task, task.name=dummy, task.id=2} enter
2024-06-20T15:10:05.495416Z TRACE
⤷ runtime.spawn[2]{kind=task, task.name=dummy, task.id=2} exit
2024-06-20T15:10:05.495469Z TRACE
⤷ runtime.spawn[2]{kind=task, task.name=dummy, task.id=2} enter
2024-06-20T15:10:05.495532Z TRACE
⤷ runtime.spawn[2]{kind=task, task.name=dummy, task.id=2} exit
2024-06-20T15:10:05.495579Z TRACE
⤷ runtime.spawn[2]{kind=task, task.name=dummy, task.id=2} close
2024-06-20T15:10:05.495667Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} new
2024-06-20T15:10:05.496063Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.496115Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: self-wake: wake from same task (async)
2024-06-20T15:10:05.496171Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio::task::waker: op="waker.wake_by_ref", task.id=2251799813685250
2024-06-20T15:10:05.496234Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.496289Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.496340Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio::task::waker: op="waker.clone", task.id=2251799813685250
2024-06-20T15:10:05.496401Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.508063Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} new
2024-06-20T15:10:05.508183Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} enter
2024-06-20T15:10:05.508240Z DEBUG
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4}
⤷ tokio_spawn_wake: wake from another task (async)
2024-06-20T15:10:05.508753Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4}
⤷ tokio::task::waker: op="waker.wake", task.id=2251799813685250
2024-06-20T15:10:05.508824Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} exit
2024-06-20T15:10:05.508878Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} enter
2024-06-20T15:10:05.508929Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} exit
2024-06-20T15:10:05.508978Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} close
2024-06-20T15:10:05.509039Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.509132Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: received message
2024-06-20T15:10:05.509187Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio::task::waker: op="waker.clone", task.id=2251799813685250
2024-06-20T15:10:05.509253Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.519517Z DEBUG
⤷ tokio_spawn_wake: wake from another thread (non-async)
2024-06-20T15:10:05.519585Z TRACE
⤷ tokio::task::waker: op="waker.wake", task.id=2251799813685250
2024-06-20T15:10:05.519695Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.519788Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: received message
2024-06-20T15:10:05.519839Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: received None
2024-06-20T15:10:05.519907Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.519954Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.520001Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.520046Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} close
Since that's a lot of code and a lot of tracing output, let's go step by step.
We've got three different waker cases here, and we're going to look at each of them in turn:
- task wakes itself (self wake)
- task gets woken by another task
- task gets woken from outside the runtime
At the beginning of our async main function, we've got some tracing-subscriber set up code and then we create a multi-producer single-consumer channel. We only have one slot in the channel, but that's not integral to this example.
#[tokio::main]
async fn main() {
use tracing_subscriber::prelude::*;
let layer = ari_subscriber::Layer::new();
tracing_subscriber::registry().with(layer).init();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
// All the rest of the code
}
self wake
A self-wake is what it sounds like, when a task wakes itself. Since a task can't do anything when it is not being polled, this means that the task must wake itself before it returns Poll::Pending
. As such, when it does return Poll::Pending
, it will immediately be ready to be polled again.
Here's the code that does this:
let receiver_jh = spawn_named("receiver", async move {
tracing::debug!("self-wake: wake from same task (async)");
self_wake().await;
// More things happen here
});
We spawn a task with the name receiver
which emits a DEBUG tracing event and then awaits a function called self_wake()
. The exact contents of this function aren't important, but you can check it out at the bottom of this post under self wake implementation. Likewise, the implementation of the spawn_now()
function isn't important, it is just a convenience wrapper around Tokio's task::Builder
API, check the implementation below under spawn named implementation.
Now let's look at the traces that correspond to this section of code.
2024-06-20T15:10:05.495667Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} new
2024-06-20T15:10:05.496063Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.496115Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: self-wake: wake from same task (async)
2024-06-20T15:10:05.496171Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio::task::waker: op="waker.wake_by_ref", task.id=2251799813685250
2024-06-20T15:10:05.496234Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.496289Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
Here we have six traces, each on multiple lines. Here's what happens:
new
- the receiver task is created, it has the name"receiver"
and the Tokio task ID3
. Theruntime.spawn
span representing the task has span ID2251799813685250
, this is seen in brackets[]
after the name.enter
- the new task gets polled for the first time.tokio_spawn_wake
- this line is the output from thetracing::debug
invocation in our own code (it has no colours).tokio::task::waker
- this is where our task wakes itself from insideself_wake()
. Notice that thetask.id
here refers to theruntime.spawn
span representing the task, not to Tokio's task ID. We know that this is a self wake because the value of thetask.id
field matches the span ID of the event's parent span.exit
- the task exits (it has returnedPoll::Pending
), this corresponds to the.await
point called onself_wake()
.enter
- because the task has already been woken and there is nothing else waiting, it is polled again immediately.
The most important take away here is how we know this is a self wake. It's because the value of the task.id
field on the waker event matches the span ID of that event's parent span.
Here's that trace again with the values marked in red:
2024-06-20T15:10:05.496171Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio::task::waker: op="waker.wake_by_ref", task.id=2251799813685250
wake from another task
The rest of the receiver task's implementation is reading from the channel:
let receiver_jh = spawn_named("receiver", async move {
// self-wake code
while rx.recv().await.is_some() {
tracing::debug!("received message");
}
tracing::debug!("received None");
});
This will asynchronously wait for a new message on the channel in a loop. It will exit once None
is received, which will happen when all the channel's senders are dropped. We've added some DEBUG tracing events so that we can see when this task receives messages from the channel.
There are 2 traces that we didn't reach in the previous section, here they are:
2024-06-20T15:10:05.496340Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio::task::waker: op="waker.clone", task.id=2251799813685250
2024-06-20T15:10:05.496401Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
The first trace is a tokio::task::waker
operation, specifically the waker is being cloned. This is being done by the mpsc
channel. Since the channel is empty, rx.recv().await
will return Poll::Pending
. The task will be woken up when something comes into the channel to be received. To do this, the channel needs to hold onto the waker corresponding to the receiver task so that it can invoke it when it can proceed. That's what the clone operation is.
The final line is the task returning Poll::Pending
and moving into an idle state.
Now, let's look at what is happening with the channel's senders.
tokio::time::sleep(Duration::from_millis(10)).await;
let task_tx = tx.clone();
spawn_named("sender-task", async move {
tracing::debug!("wake from another task (async)");
task_tx.send(()).await.unwrap();
})
.await
.unwrap();
Here we use a bit of a hack, we wait for 10 milliseconds, this is to make sure that the the following code doesn't get executed until after the self wake has occurred. This is likely not necessary, but worth adding to ensure repeatability in a learning context.
In this code, we are spawning a new task (which will immediately be scheduled to run). Inside the task, we emit a DEBUG tracing event and then send a message consisting of the unit struct ()
through the channel.
Here are the traces that correspond to this part of the code:
2024-06-20T15:10:05.508063Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} new
2024-06-20T15:10:05.508183Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} enter
2024-06-20T15:10:05.508240Z DEBUG
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4}
⤷ tokio_spawn_wake: wake from another task (async)
2024-06-20T15:10:05.508753Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4}
⤷ tokio::task::waker: op="waker.wake", task.id=2251799813685250
2024-06-20T15:10:05.508824Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} exit
2024-06-20T15:10:05.508878Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} enter
2024-06-20T15:10:05.508929Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} exit
2024-06-20T15:10:05.508978Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4} close
This section contains 8 traces, let's walk through them all in turn:
new
- thesender-task
task is created. It has Tokio Task ID4
and theruntime.spawn
span representing that task has span ID2251799813685251
.enter
- the task is polled for the first time.tokio_spawn_wake
- the DEBUG event that we emitted from our code withtracing::debug
.tokio::task::waker
- here is our waker operation,waker.wake
. Thetask.id
in this event is2251799813685250
. Remember, this is the span ID of theruntime.spawn
span representing a task. It isn't the same as the parent span, so we know we're waking some other task, if you check back up you'll see that it is indeed the receiver task that's being woken.exit
- the task exits. It's not clear from this trace, but the task has returnedPoll::Ready
and won't be polled again.enter
- the task span is entered again (see 8.)exit
- the task span is exited again (see 8.)close
- the task has been dropped. The task has now been destroyed and won't be polled again. Theenter
andexit
in traces 6. and 7. don't actually indicate that the task gets polled an additional time. Tokio enters the span one last time to drop the task, which results in these extra steps.
Note that in trace 4. the operation is waker.wake
, which consumes the waker, whereas in the self wake case the operation was waker.wake_by_ref
, which takes a reference to the waker. This is because the channel owns a waker. The channel's waker was cloned when the receiver task called rx.recv().await
and will now be consumed! In the case of self waking, we didn't need to clone the waker as we were within the same task and could access it by reference.
We can tell that there was capacity in the channel to send our message, because otherwise the sender task would have exited and then entered again before being able to wake the receiver task. And of course it would have had to clone the waker so that it could be woken when capacity in the channel was available. But there was capacity, so none of this happened.
Another difference with the self wake case (perhaps the difference) is that in the tokio::task::waker
event, the value of the task.id
field is different to the span ID of the event's parent span (which represents the task which the wake operation occurred within). This is how we know that the receiver task was woken by some other task. It would be interesting to analyse the causality graph along the lines of waker operations.
Here's the trace with the two values highlighted in red:
2024-06-20T15:10:05.508753Z TRACE
⤷ runtime.spawn[2251799813685251]{kind=task, task.name=sender-task, task.id=4}
⤷ tokio::task::waker: op="waker.wake", task.id=2251799813685250
receiving the first message
We now expect the receiver task to get woken and receive that message. Let's check the instrumentation to see that this is indeed happening:
2024-06-20T15:10:05.509039Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.509132Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: received message
2024-06-20T15:10:05.509187Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio::task::waker: op="waker.clone", task.id=2251799813685250
2024-06-20T15:10:05.509253Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
And it is! We can see that our receiver task enters (gets polled) and consumes the message from the channel, which we can see from the DEBUG message we emitted. The receiver task then clones its own waker (because the waker event task.id
and the parent runtime.spawn
span ID match) which means that it's awaiting receiving from the channel. Finally, the task exits, so it must have returned Poll::Pending
as expected.
wake from outside the runtime
Finally, let's take a look at some code that's going to wake our receiver task from outside the runtime. We spawn a new thread and then immediately sleep:
let no_runtime_tx = tx.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(10));
// Things done in the thread
})
.join()
.unwrap();
We wait for 10 milliseconds again. Like before, the sleep is a hack to make sure that the receiver task has absolutely, positively consumed the message that was in the channel and has already awaited on receiving a new message. We just saw in the previous section that this has happened, so let's move on.
Now on to waking from outside the runtime.
let no_runtime_tx = tx.clone();
std::thread::spawn(move || {
// That hacky sleep code
tracing::debug!("wake from another thread (non-async)");
no_runtime_tx.try_send(()).unwrap();
})
.join()
.unwrap();
From our new thread, we emit a DEBUG tracing event and then send a message through the channel. Because we're not in an async context, we need to use the non-async try_send
method on the channel's sender. We ensured that capacity would be available by sleeping for a little bit earlier, so we just unwrap the result like some uncouth villain.
Let's check the instrumentation from this episode:
2024-06-20T15:10:05.519517Z DEBUG
⤷ tokio_spawn_wake: wake from another thread (non-async)
2024-06-20T15:10:05.519585Z TRACE
⤷ tokio::task::waker: op="waker.wake", task.id=2251799813685250
This only consists of 2 traces. Standard library threads aren't instrumented, so we don't see anything about spawning the thread or starting execution within it. The first trace is the DEBUG tracing event that we emitted from our code. Then we see the tokio::task::waker
event. We can see that the task.id
field corresponds to the span ID for the receiver task 2251799813685250
, but that this event doesn't have a runtime.spawn
parent span. In fact, it doesn't have a parent span at all (we call this a root event).
This tells us that the task was woken from outside the async runtime, which can be useful information in and of itself.
receiving the second message
Once again, we expect the receiver task to get woken, get polled, and this time receive the message that wasn't there last time it was polled. That's exactly what the traces show:
2024-06-20T15:10:05.519695Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.519788Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: received message
The first trace shows the receiver task becoming active and the second trace comes from the DEBUG tracing event that we emit from the code after receiving a message. This fits with what we were expecting.
We're going to stop here, before the receiver task goes any further (but it is still active, as we haven't reached an exit
span event yet).
finishing up
Let's round things up by running through the last few lines of code and the traces that are produced. We're left with two lines of code at the end of our async main function:
#[tokio::main]
async fn main() {
// Everything else that has happened up until now
drop(tx);
receiver_jh.await.unwrap();
}
Here we drop the channel sender that was held in the main task (it was never actually used, just cloned into the async task and the thread). This is the last sender, so this will cause the receiver task to exit the while
loop it is in. Then, we await the receiver task's join handle to make sure that it has completed and we're done.
Remember that we were in the middle of polling the receiver task. Let's look at the last of the traces:
2024-06-20T15:10:05.519839Z DEBUG
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3}
⤷ tokio_spawn_wake: received None
2024-06-20T15:10:05.519907Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.519954Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} enter
2024-06-20T15:10:05.520001Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} exit
2024-06-20T15:10:05.520046Z TRACE
⤷ runtime.spawn[2251799813685250]{kind=task, task.name=receiver, task.id=3} close
We've got 5 traces here, so we'll go through them one by one:
tokio_spawn_wake
- the receiver task receivesNone
from the channel, indicating that all the senders have been dropped and the channel is closedexit
- the receiver task exits, this matches where the span entered at the end of the previous section.enter
- the receiver task'sruntime.spawn
span enters (that funny enter-exit as a task is dropped that isn't really a poll)exit
- the receiver task'sruntime.spawn
span exits (that funny enter-exit as a task is dropped that isn't really a poll)close
- the receiver task is dropped
We can see here that the last sender gets dropped before the receiver task has finished receiving the previous message and looped back around to call recv().await
again. Since the channel has closed by this point, the receiver task gets the notification straight away. There's no cloning of the waker or returning Poll::Pending
.
lies by omission
There's one small point that we've skirted around and that conveniently didn't show up in the traces. If we had added a final tracing event at the end of our main function, the trace would look like this:
2024-06-20T15:10:05.520107Z DEBUG
⤷ tokio_spawn_wake: good-bye
At first, this might seem just fine. But then we remember, isn't our async main function not also a task?
And the answer is yes, it is. And that task also has a span (although it has kind=block_on
). So why doesn't it show up? Basically, it's because when we started our tracing subscriber (the Registry
from the tracing-subscriber
crate with our ari-subscriber
layer attached), this task had already been created and so had the runtime.spawn
span that represents it. Tracing subscribers will only "see" spans that are created after they are initialized, previous spans will be lost. Even if they would still be alive and even active later on, those spans won't be received by the subscriber.
We actually do this on purpose. When Tokio starts up with the multi-threaded runtime, it creates a pool of threads to run blocking tasks on. The functionality that runs these threads is actually itself a blocking task (very meta, I know) and so you will see a bunch of blocking task runtime.spawn
spans created and entered at the beginning of the execution. And that is a lot of noise to filter out for an example. It's usually best to skip it completely.
If you do want to capture all the instrumentation from when a runtime starts up, then you need to initialize your tracing subscriber first, and then create and enter a runtime "manually" (without the #[tokio::main]
attribute). You can see an example of this in the console-subscriber
example local
.
This is the end of the main part of the post. Below you'll find the implementation of the functions spawn_named()
and self_wake()
that were used within the example code.
If you have suggestions or corrections, please get in touch.
spawn named implementation
This implementation wraps the task builder API (which is unstable) to reduce the code in the main function.
use std::future::Future;
#[track_caller]
fn spawn_named<Fut>(name: &str, f: Fut) -> tokio::task::JoinHandle<<Fut as Future>::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
tokio::task::Builder::new()
.name(name)
.spawn(f)
.unwrap_or_else(|_| panic!("spawning task '{name}' failed"))
}
self wake implementation
This is the implementation for self_wake()
, it's based on the old implementation of yield_now()
in the tokio
crate, before it was replaced with a more complex version that doesn't starve the I/O driver (done in tokio-rs/tokio#5223).
The function returns a type erased future which will check whether it has already yielded. If it has, it will return Poll::Ready
, otherwise it will record that it is about to yield, wake the waker it received in the context parameter to the poll function and then return Poll::Pending
.
use std::{future::Future, task::Poll};
fn self_wake() -> impl Future<Output = ()> {
struct SelfWake {
yielded: bool,
}
impl Future for SelfWake {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
if self.yielded {
return Poll::Ready(());
}
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
SelfWake { yielded: false }
}