Build your own block_on()

栏目: IT技术 · 发布时间: 5年前

内容简介:If you’ve ever wondered howInspiration for this blog post comes from two crates,Our implementation will have slightly different goals from

If you’ve ever wondered how block_on from the futures crate works, today we are going to write our own version of the function.

Inspiration for this blog post comes from two crates, wakeful and extreme . wakeful has devised a simple way to create a Waker from a function, while extreme is an extremely terse implementation of block_on() .

Our implementation will have slightly different goals from extreme . Rather than going for zero dependencies and minimal number of lines of code, we’ll go for a safe and efficient but still pretty simple implementation.

Dependencies we’re going to use are pin-utils , crossbeam , and async-task .

Function signature

The signature of block_on looks as follows. We take a future as an argument, run it on the current thread (blocking whenever it is pending), and return its output:

fn block_on<F: Future>(future: F) -> F::Output {
    todo!()
}

Now let’s implement the missing todo!() part…

First attempt

Note that the poll method on every Future takes a pinned future. So we need to pin it first. While there is a way to do that safely using Box::pin() , we’d rather pin the future on the stack than heap.

Unfortunately, the only way of pinning futures on the stack safely is by using the pin-utils crate:

pin_utils::pin_mut!(future);

The pin_mut macro converts future from a variable of type F into one of type Pin<&mut F> .

Next we’ll need to specify what happens when this future is woken. In this case, waking should simply unblock the thread running the future.

Constructing a Waker can be gnarly — just take a peek at the implementation of extreme . And this is the simplest possible way of constructing a Waker by hand! So many raw pointers, so much unsafe code… let’s skip this part for now and fill in the blank later.

let waker = todo!();

Finally, we create a task context from the waker and keep polling the future in a loop. If it completes, return the output. If it’s pending, block the current thread:

let cx = &mut Context::from_waker(&waker);
loop {
    match future.as_mut().poll(cx) {
        Poll::Ready(output) => return output,
        Poll::Pending => thread::park(),
    }
}

In case you’re puzzled by the Context type, it’s a wrapper around Waker — there’s nothing more to it. When async/await in Rust was being designed, we weren’t sure if it’d be useful to pass anything else besides a Waker to poll() so we came up with this wrapper that might hold more stuff in a future version of Rust.

Anyways… we’re almost done. Let’s go back to waker construction and fill in the blank marked with todo!() .

If you think about it, Waker is really just a carefully optimized, fancy version of Arc<dyn Fn() + Send + Sync> , and wake() invokes this function. Also put yet another way, a Waker is a callback that gets invoked whenever the future can continue execution.

Since Waker is so difficult to construct, sagebind came up waker_fn() , a straightforward way to convert any function into a Waker . Unfortunately, wakeful seems to be yanked at the moment, so I borrowed waker_fn() and put it into my crate async-task .

In our block_on , the callback unblocks the thread running the future:

let thread = thread::current();
let waker = async_task::waker_fn(move || thread.unpark());

So simple! Much better than fiddling with RawWaker and RawWakerVTable .

Internally, the waker_fn() constructor literally creates an Arc<impl Fn() + Send + Sync> and then converts it into Waker with unsafe code that looks similar to what we saw in extreme .

Here’s a complete implementation of block_on() :

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    let thread = thread::current();
    let waker = async_task::waker_fn(move || thread.unpark());

    let cx = &mut Context::from_waker(&waker);
    loop {
        match future.as_mut().poll(cx) {
            Poll::Ready(output) => return output,
            Poll::Pending => thread::park(),
        }
    }
}

See v1.rs if you’d like to try running this code.

A problem with parking

But, it’s not time to celebrate yet. There’s a problem. If user code inside the future also makes use of the park/unpark API, it may pick up and “steal” unpark notifications from the callback. Read this issue for a more elaborate explanation.

A possible solution is to use a way of parking and unparking threads different from the one inside the std::thread module. That way, code inside the future will not be able to interfere with waking.

There’s a very similar park/unpark mechanism in crossbeam , except it allows us to create arbitrarily many independent parkers rather than having one per thread. Let’s create one per invocation of block_on() :

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = async_task::waker_fn(move || unparker.unpark());

    let cx = &mut Context::from_waker(&waker);
    loop {
        match future.as_mut().poll(cx) {
            Poll::Ready(output) => return output,
            Poll::Pending => parker.park(),
        }
    }
}

That’s it! Problem solved.

See v2.rs if you’d like to try running this code.

A caching optimization

Creating a Parker and Waker is not free — both of those incur the cost of an allocation, which is unfortunate. Can we improve?

Instead of constructing a Parker and Waker on each invocation of block_on , why not cache them in thread-local storage? That way a thread will reuse the same instances across all invocations of block_on() :

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    thread_local! {
        static CACHE: (Parker, Waker) = {
            let parker = Parker::new();
            let unparker = parker.unparker().clone();
            let waker = async_task::waker_fn(move || unparker.unpark());
            (parker, waker)
        };
    }

    CACHE.with(|(parker, waker)| {
        let cx = &mut Context::from_waker(&waker);
        loop {
            match future.as_mut().poll(cx) {
                Poll::Ready(output) => return output,
                Poll::Pending => parker.park(),
            }
        }
    })
}

If the future is quick to execute, this small change will make block_on() dramatically more efficient!

See v3.rs if you’d like to try running this code.

What about recursion?

Are we done yet? Well… just one more last thing.

What if the future inside block_on() calls block_on() again recursively? We can either permit or forbid recursion.

If we choose to permit recursion, then we also need to make sure recursive calls of block_on() don’t share the same Parker and Waker instances, or else there’s no way to tell which block_on() invocation gets woken.

The block_on() from the futures crate panics on recursive invocations of block_on() . I don’t have a strong opinion on whether permitting or forbidding recursion is better — both behaviors are sensible. But, since we’re mimicking the futures version, let’s forbid recursion.

To detect recursive invocations, we could introduce another thread-local variable indicating whether we’re currently inside block_on() or not. But that’s a lot of work.

Here’s a cool trick that requires fewer changes to the code. Let’s wrap (Parker, Waker) into a RefCell , and panic if a mutable borrow is already active:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    thread_local! {
        static CACHE: RefCell<(Parker, Waker)> = {
            let parker = Parker::new();
            let unparker = parker.unparker().clone();
            let waker = async_task::waker_fn(move || unparker.unpark());
            RefCell::new((parker, waker))
        };
    }

    CACHE.with(|cache| {
        let (parker, waker) = &mut *cache.try_borrow_mut().ok()
            .expect("recursive `block_on`");

        let cx = &mut Context::from_waker(&waker);
        loop {
            match future.as_mut().poll(cx) {
                Poll::Ready(output) => return output,
                Poll::Pending => parker.park(),
            }
        }
    })
}

Finally. Now we’re really done, I promise! This final implementation is as correct, as robust, and as efficient as it gets. More or less. :)

See v4.rs if you’d like to try running this code.

Benchmarks

To test how efficient our block_on() is, let’s benchmark it against the one from futures .

But first, we’ll write a helper future type that yields a number of times and then completes:

struct Yields(u32);

impl Future for Yields {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.0 == 0 {
            Poll::Ready(())
        } else {
            self.0 -= 1;
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

As an example, to benchmark a future yielding 10 times, we write:

#[bench]
fn custom_block_on_10_yields(b: &mut Bencher) {
    b.iter(|| block_on(Yields(10)));
}

Let’s make a set of three benchmarks with futures yielding 0, 10, and 50 times. We run those using our custom block_on() and then using block_on() from futures . You can find the full benchmark code in yield.rs .

And here are the results on my machine:

test custom_block_on_0_yields   ... bench:           3 ns/iter (+/- 0)
test custom_block_on_10_yields  ... bench:         130 ns/iter (+/- 12)
test custom_block_on_50_yields  ... bench:         638 ns/iter (+/- 20)
test futures_block_on_0_yields  ... bench:          10 ns/iter (+/- 0)
test futures_block_on_10_yields ... bench:         236 ns/iter (+/- 10)
test futures_block_on_50_yields ... bench:       1,139 ns/iter (+/- 30)

The numbers say our custom block_on() is roughly 2 or 3 times faster in this particular benchmark, which is not bad at all!

Conclusion

Async Rust can feel intimidating because it contains so much machinery: the Future trait, pinning, the Context type, Waker and its friends RawWaker and RawWakerVTable , desugaring of async and await , unsafe code, raw pointers, and so on.

But the thing is, a lot of the ugly stuff is not even that important — it’s really just boring boilerplate that can be removed with crates like pin-utils , async-task , and crossbeam .

And indeed, today we managed to build an efficient block_on() in few lines of safe code without having to understand most of that boilerplate. In another blog post, we’ll build a real executor…


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

JSP 2.0技术手册

JSP 2.0技术手册

杜远君、林康司、林上杰 / 湖北教育出版社,电子工业出版社 / 2004-5-1 / 59.0

本书图文并茂,以丰富的实例为引导,全面介绍了主流的Java Web开发技术——JSP 2.0,重点介绍Java在展示层的两项重要技术:Java Servlet与JavaServer Pages。它们是最重要的Java核心技术。对这两项技术的深入了解,将有助于您未来对于JavaServer Faces(JSF)技术以及Java Web Services技术的学习。 本书分为三大部分,前......一起来看看 《JSP 2.0技术手册》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具