Why `futures::channel::mpsc` can just notify one sender?
up vote
1
down vote
favorite
I'm reading futures-preview
0.3 sources to find out how to do "notify any" correctly. In mpsc::channel
(which is bounded), multiple senders may wait for a receipt (in case of full buffer).
Looking into the implementation of next_message
and unpark_one
, the receiver seems to only notify one sender per one receipt.
I doubt this works in presense of select!
, because select!
may lead to false notification. However, I couldn't produce a problem case.
Here's my attempt to confuse mpsc
:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
and this:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
I expect this to happen:
- The buffer is filled by
-1
. Therefore later senders are blocked. - There are both "true waiters" and "false waiters".
False waiters already exited, because the other arm ofselect!
immediately completes. - In each call to
await!(recv.next())
, at most one waiting sender is
notified. If a false waiter is notified, no one can push to the buffer,
even if the buffer has a vacant room. - If all elements are drained without true notification,
the entire system is stuck.
Despite my expectation, the main2
async function successfully completed. Why?
concurrency rust rust-tokio
add a comment |
up vote
1
down vote
favorite
I'm reading futures-preview
0.3 sources to find out how to do "notify any" correctly. In mpsc::channel
(which is bounded), multiple senders may wait for a receipt (in case of full buffer).
Looking into the implementation of next_message
and unpark_one
, the receiver seems to only notify one sender per one receipt.
I doubt this works in presense of select!
, because select!
may lead to false notification. However, I couldn't produce a problem case.
Here's my attempt to confuse mpsc
:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
and this:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
I expect this to happen:
- The buffer is filled by
-1
. Therefore later senders are blocked. - There are both "true waiters" and "false waiters".
False waiters already exited, because the other arm ofselect!
immediately completes. - In each call to
await!(recv.next())
, at most one waiting sender is
notified. If a false waiter is notified, no one can push to the buffer,
even if the buffer has a vacant room. - If all elements are drained without true notification,
the entire system is stuck.
Despite my expectation, the main2
async function successfully completed. Why?
concurrency rust rust-tokio
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I'm reading futures-preview
0.3 sources to find out how to do "notify any" correctly. In mpsc::channel
(which is bounded), multiple senders may wait for a receipt (in case of full buffer).
Looking into the implementation of next_message
and unpark_one
, the receiver seems to only notify one sender per one receipt.
I doubt this works in presense of select!
, because select!
may lead to false notification. However, I couldn't produce a problem case.
Here's my attempt to confuse mpsc
:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
and this:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
I expect this to happen:
- The buffer is filled by
-1
. Therefore later senders are blocked. - There are both "true waiters" and "false waiters".
False waiters already exited, because the other arm ofselect!
immediately completes. - In each call to
await!(recv.next())
, at most one waiting sender is
notified. If a false waiter is notified, no one can push to the buffer,
even if the buffer has a vacant room. - If all elements are drained without true notification,
the entire system is stuck.
Despite my expectation, the main2
async function successfully completed. Why?
concurrency rust rust-tokio
I'm reading futures-preview
0.3 sources to find out how to do "notify any" correctly. In mpsc::channel
(which is bounded), multiple senders may wait for a receipt (in case of full buffer).
Looking into the implementation of next_message
and unpark_one
, the receiver seems to only notify one sender per one receipt.
I doubt this works in presense of select!
, because select!
may lead to false notification. However, I couldn't produce a problem case.
Here's my attempt to confuse mpsc
:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
and this:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
I expect this to happen:
- The buffer is filled by
-1
. Therefore later senders are blocked. - There are both "true waiters" and "false waiters".
False waiters already exited, because the other arm ofselect!
immediately completes. - In each call to
await!(recv.next())
, at most one waiting sender is
notified. If a false waiter is notified, no one can push to the buffer,
even if the buffer has a vacant room. - If all elements are drained without true notification,
the entire system is stuck.
Despite my expectation, the main2
async function successfully completed. Why?
concurrency rust rust-tokio
concurrency rust rust-tokio
asked Nov 11 at 4:40
Masaki Hara
1,458714
1,458714
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
1
down vote
I doubt this works in presense of select!, because select! may lead to false notification.
No, You can't "confuse" a mpsc
channel using select!
:
select!
does not trigger any mspc related notification, it just return the future that finishes first.
When the message queue is full it is await!(recv.next())
that notifies one producer that a slot into the bounded channel is now available.
In other words: there are no true waiters
and false waiters
:
when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.
Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
– Masaki Hara
Nov 12 at 12:19
It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It isReceiver::unpark_one
that notifies one sender in the queue of parked senders that a slot in the buffer is available.
– attdona
Nov 12 at 14:11
I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively callspoll
that will eventually reach the sender future. But is this guaranteed that thepoll
reaches the leaf future we intend? I thought not, because if one arm ofselect!
completes (ready
in my case), theselect!
will ignore other futures (send
in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
– Masaki Hara
Nov 12 at 14:47
Future don't wait for something, they are to be polled: whenselect!
complete thesending
future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that-3
is not sent through the channel.
– attdona
Nov 13 at 10:00
Yes, then my concern is that, after that, the buffer may be vacant (because-3
is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
– Masaki Hara
Nov 13 at 13:42
|
show 3 more comments
up vote
0
down vote
Further investigation on the futures
source code solved my problem. At last, I cannot confuse the mpsc in this way.
The point is that, the size of mpsc
is flexible and can grow more than initially specified. This behavior is mentioned in the docs:
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.
Problem with fixed buffer
Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:
- When the queue is empty, receivers block.
- When the queue is full (that is, the size is hitting the bound), senders block.
In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).
In multithread programming, this is accomplished by primitives like notify_one
. However, in futures
, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select!
or Deadline
) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).
mpsc
is flexible
As pointed out above, the buffer size for futures::channel::mpsc::channel
isn't strict. The spec is summarized as:
- When
message_queue.len() == 0
, receivers block. - When
message_queue.len() >= buffer
, senders may block. - When
message_queue.len() >= buffer + num_senders
, senders block.
Here, num_senders
is basically the number of clones of Sender
, but more than that in some cases. More precisely, num_senders
is the number of SenderTask
s.
So, how do we avoid resource sharing? We have additional states for that:
- Each sender (an instance of
SenderTask
) hasis_parked
boolean state. - The channel has another queue called
parked_queue
, a queue ofArc
references toSenderTask
.
The channel maintains the following invariants:
message_queue.len() <= buffer + num_parked_senders
. Note that we don't know the value ofnum_parked_senders
.parked_queue.len() == min(0, message_queue.len() - buffer)
- Each parked sender has at least one message in
parked_queue
.
This is accomplished by the following algorithm:
- For receiving,
- it pops off a
SenderTask
fromparked_queue
and, if the sender is parked, unpark it.
- it pops off a
- For sending,
- It always waits for
is_parked
to befalse
. Ifmessage_queue.len() < buffer
, asparked_queue.len() == 0
, all senders are unparked. Therefore we can guarantee progress in this case. - If
is_parked
isfalse
, push the message to the queue anyway. - After that, if
message_queue.len() <= buffer
, it needs to do nothing further. - if
message_queue.len() > buffer
, the sender is made unparked and pushed toparked_queue
.
- It always waits for
You can easily check the invariant is maintained in the algorithm above.
Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked
state. Even if the sending task is dropped before completion, it just remains in parked_queue
for a while and doesn't block anything. How clever it is!
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
I doubt this works in presense of select!, because select! may lead to false notification.
No, You can't "confuse" a mpsc
channel using select!
:
select!
does not trigger any mspc related notification, it just return the future that finishes first.
When the message queue is full it is await!(recv.next())
that notifies one producer that a slot into the bounded channel is now available.
In other words: there are no true waiters
and false waiters
:
when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.
Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
– Masaki Hara
Nov 12 at 12:19
It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It isReceiver::unpark_one
that notifies one sender in the queue of parked senders that a slot in the buffer is available.
– attdona
Nov 12 at 14:11
I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively callspoll
that will eventually reach the sender future. But is this guaranteed that thepoll
reaches the leaf future we intend? I thought not, because if one arm ofselect!
completes (ready
in my case), theselect!
will ignore other futures (send
in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
– Masaki Hara
Nov 12 at 14:47
Future don't wait for something, they are to be polled: whenselect!
complete thesending
future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that-3
is not sent through the channel.
– attdona
Nov 13 at 10:00
Yes, then my concern is that, after that, the buffer may be vacant (because-3
is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
– Masaki Hara
Nov 13 at 13:42
|
show 3 more comments
up vote
1
down vote
I doubt this works in presense of select!, because select! may lead to false notification.
No, You can't "confuse" a mpsc
channel using select!
:
select!
does not trigger any mspc related notification, it just return the future that finishes first.
When the message queue is full it is await!(recv.next())
that notifies one producer that a slot into the bounded channel is now available.
In other words: there are no true waiters
and false waiters
:
when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.
Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
– Masaki Hara
Nov 12 at 12:19
It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It isReceiver::unpark_one
that notifies one sender in the queue of parked senders that a slot in the buffer is available.
– attdona
Nov 12 at 14:11
I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively callspoll
that will eventually reach the sender future. But is this guaranteed that thepoll
reaches the leaf future we intend? I thought not, because if one arm ofselect!
completes (ready
in my case), theselect!
will ignore other futures (send
in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
– Masaki Hara
Nov 12 at 14:47
Future don't wait for something, they are to be polled: whenselect!
complete thesending
future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that-3
is not sent through the channel.
– attdona
Nov 13 at 10:00
Yes, then my concern is that, after that, the buffer may be vacant (because-3
is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
– Masaki Hara
Nov 13 at 13:42
|
show 3 more comments
up vote
1
down vote
up vote
1
down vote
I doubt this works in presense of select!, because select! may lead to false notification.
No, You can't "confuse" a mpsc
channel using select!
:
select!
does not trigger any mspc related notification, it just return the future that finishes first.
When the message queue is full it is await!(recv.next())
that notifies one producer that a slot into the bounded channel is now available.
In other words: there are no true waiters
and false waiters
:
when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.
I doubt this works in presense of select!, because select! may lead to false notification.
No, You can't "confuse" a mpsc
channel using select!
:
select!
does not trigger any mspc related notification, it just return the future that finishes first.
When the message queue is full it is await!(recv.next())
that notifies one producer that a slot into the bounded channel is now available.
In other words: there are no true waiters
and false waiters
:
when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.
answered Nov 12 at 9:13
attdona
2,79911118
2,79911118
Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
– Masaki Hara
Nov 12 at 12:19
It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It isReceiver::unpark_one
that notifies one sender in the queue of parked senders that a slot in the buffer is available.
– attdona
Nov 12 at 14:11
I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively callspoll
that will eventually reach the sender future. But is this guaranteed that thepoll
reaches the leaf future we intend? I thought not, because if one arm ofselect!
completes (ready
in my case), theselect!
will ignore other futures (send
in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
– Masaki Hara
Nov 12 at 14:47
Future don't wait for something, they are to be polled: whenselect!
complete thesending
future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that-3
is not sent through the channel.
– attdona
Nov 13 at 10:00
Yes, then my concern is that, after that, the buffer may be vacant (because-3
is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
– Masaki Hara
Nov 13 at 13:42
|
show 3 more comments
Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
– Masaki Hara
Nov 12 at 12:19
It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It isReceiver::unpark_one
that notifies one sender in the queue of parked senders that a slot in the buffer is available.
– attdona
Nov 12 at 14:11
I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively callspoll
that will eventually reach the sender future. But is this guaranteed that thepoll
reaches the leaf future we intend? I thought not, because if one arm ofselect!
completes (ready
in my case), theselect!
will ignore other futures (send
in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
– Masaki Hara
Nov 12 at 14:47
Future don't wait for something, they are to be polled: whenselect!
complete thesending
future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that-3
is not sent through the channel.
– attdona
Nov 13 at 10:00
Yes, then my concern is that, after that, the buffer may be vacant (because-3
is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
– Masaki Hara
Nov 13 at 13:42
Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
– Masaki Hara
Nov 12 at 12:19
Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
– Masaki Hara
Nov 12 at 12:19
It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is
Receiver::unpark_one
that notifies one sender in the queue of parked senders that a slot in the buffer is available.– attdona
Nov 12 at 14:11
It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is
Receiver::unpark_one
that notifies one sender in the queue of parked senders that a slot in the buffer is available.– attdona
Nov 12 at 14:11
I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls
poll
that will eventually reach the sender future. But is this guaranteed that the poll
reaches the leaf future we intend? I thought not, because if one arm of select!
completes (ready
in my case), the select!
will ignore other futures (send
in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.– Masaki Hara
Nov 12 at 14:47
I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls
poll
that will eventually reach the sender future. But is this guaranteed that the poll
reaches the leaf future we intend? I thought not, because if one arm of select!
completes (ready
in my case), the select!
will ignore other futures (send
in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.– Masaki Hara
Nov 12 at 14:47
Future don't wait for something, they are to be polled: when
select!
complete the sending
future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3
is not sent through the channel.– attdona
Nov 13 at 10:00
Future don't wait for something, they are to be polled: when
select!
complete the sending
future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3
is not sent through the channel.– attdona
Nov 13 at 10:00
Yes, then my concern is that, after that, the buffer may be vacant (because
-3
is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?– Masaki Hara
Nov 13 at 13:42
Yes, then my concern is that, after that, the buffer may be vacant (because
-3
is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?– Masaki Hara
Nov 13 at 13:42
|
show 3 more comments
up vote
0
down vote
Further investigation on the futures
source code solved my problem. At last, I cannot confuse the mpsc in this way.
The point is that, the size of mpsc
is flexible and can grow more than initially specified. This behavior is mentioned in the docs:
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.
Problem with fixed buffer
Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:
- When the queue is empty, receivers block.
- When the queue is full (that is, the size is hitting the bound), senders block.
In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).
In multithread programming, this is accomplished by primitives like notify_one
. However, in futures
, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select!
or Deadline
) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).
mpsc
is flexible
As pointed out above, the buffer size for futures::channel::mpsc::channel
isn't strict. The spec is summarized as:
- When
message_queue.len() == 0
, receivers block. - When
message_queue.len() >= buffer
, senders may block. - When
message_queue.len() >= buffer + num_senders
, senders block.
Here, num_senders
is basically the number of clones of Sender
, but more than that in some cases. More precisely, num_senders
is the number of SenderTask
s.
So, how do we avoid resource sharing? We have additional states for that:
- Each sender (an instance of
SenderTask
) hasis_parked
boolean state. - The channel has another queue called
parked_queue
, a queue ofArc
references toSenderTask
.
The channel maintains the following invariants:
message_queue.len() <= buffer + num_parked_senders
. Note that we don't know the value ofnum_parked_senders
.parked_queue.len() == min(0, message_queue.len() - buffer)
- Each parked sender has at least one message in
parked_queue
.
This is accomplished by the following algorithm:
- For receiving,
- it pops off a
SenderTask
fromparked_queue
and, if the sender is parked, unpark it.
- it pops off a
- For sending,
- It always waits for
is_parked
to befalse
. Ifmessage_queue.len() < buffer
, asparked_queue.len() == 0
, all senders are unparked. Therefore we can guarantee progress in this case. - If
is_parked
isfalse
, push the message to the queue anyway. - After that, if
message_queue.len() <= buffer
, it needs to do nothing further. - if
message_queue.len() > buffer
, the sender is made unparked and pushed toparked_queue
.
- It always waits for
You can easily check the invariant is maintained in the algorithm above.
Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked
state. Even if the sending task is dropped before completion, it just remains in parked_queue
for a while and doesn't block anything. How clever it is!
add a comment |
up vote
0
down vote
Further investigation on the futures
source code solved my problem. At last, I cannot confuse the mpsc in this way.
The point is that, the size of mpsc
is flexible and can grow more than initially specified. This behavior is mentioned in the docs:
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.
Problem with fixed buffer
Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:
- When the queue is empty, receivers block.
- When the queue is full (that is, the size is hitting the bound), senders block.
In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).
In multithread programming, this is accomplished by primitives like notify_one
. However, in futures
, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select!
or Deadline
) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).
mpsc
is flexible
As pointed out above, the buffer size for futures::channel::mpsc::channel
isn't strict. The spec is summarized as:
- When
message_queue.len() == 0
, receivers block. - When
message_queue.len() >= buffer
, senders may block. - When
message_queue.len() >= buffer + num_senders
, senders block.
Here, num_senders
is basically the number of clones of Sender
, but more than that in some cases. More precisely, num_senders
is the number of SenderTask
s.
So, how do we avoid resource sharing? We have additional states for that:
- Each sender (an instance of
SenderTask
) hasis_parked
boolean state. - The channel has another queue called
parked_queue
, a queue ofArc
references toSenderTask
.
The channel maintains the following invariants:
message_queue.len() <= buffer + num_parked_senders
. Note that we don't know the value ofnum_parked_senders
.parked_queue.len() == min(0, message_queue.len() - buffer)
- Each parked sender has at least one message in
parked_queue
.
This is accomplished by the following algorithm:
- For receiving,
- it pops off a
SenderTask
fromparked_queue
and, if the sender is parked, unpark it.
- it pops off a
- For sending,
- It always waits for
is_parked
to befalse
. Ifmessage_queue.len() < buffer
, asparked_queue.len() == 0
, all senders are unparked. Therefore we can guarantee progress in this case. - If
is_parked
isfalse
, push the message to the queue anyway. - After that, if
message_queue.len() <= buffer
, it needs to do nothing further. - if
message_queue.len() > buffer
, the sender is made unparked and pushed toparked_queue
.
- It always waits for
You can easily check the invariant is maintained in the algorithm above.
Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked
state. Even if the sending task is dropped before completion, it just remains in parked_queue
for a while and doesn't block anything. How clever it is!
add a comment |
up vote
0
down vote
up vote
0
down vote
Further investigation on the futures
source code solved my problem. At last, I cannot confuse the mpsc in this way.
The point is that, the size of mpsc
is flexible and can grow more than initially specified. This behavior is mentioned in the docs:
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.
Problem with fixed buffer
Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:
- When the queue is empty, receivers block.
- When the queue is full (that is, the size is hitting the bound), senders block.
In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).
In multithread programming, this is accomplished by primitives like notify_one
. However, in futures
, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select!
or Deadline
) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).
mpsc
is flexible
As pointed out above, the buffer size for futures::channel::mpsc::channel
isn't strict. The spec is summarized as:
- When
message_queue.len() == 0
, receivers block. - When
message_queue.len() >= buffer
, senders may block. - When
message_queue.len() >= buffer + num_senders
, senders block.
Here, num_senders
is basically the number of clones of Sender
, but more than that in some cases. More precisely, num_senders
is the number of SenderTask
s.
So, how do we avoid resource sharing? We have additional states for that:
- Each sender (an instance of
SenderTask
) hasis_parked
boolean state. - The channel has another queue called
parked_queue
, a queue ofArc
references toSenderTask
.
The channel maintains the following invariants:
message_queue.len() <= buffer + num_parked_senders
. Note that we don't know the value ofnum_parked_senders
.parked_queue.len() == min(0, message_queue.len() - buffer)
- Each parked sender has at least one message in
parked_queue
.
This is accomplished by the following algorithm:
- For receiving,
- it pops off a
SenderTask
fromparked_queue
and, if the sender is parked, unpark it.
- it pops off a
- For sending,
- It always waits for
is_parked
to befalse
. Ifmessage_queue.len() < buffer
, asparked_queue.len() == 0
, all senders are unparked. Therefore we can guarantee progress in this case. - If
is_parked
isfalse
, push the message to the queue anyway. - After that, if
message_queue.len() <= buffer
, it needs to do nothing further. - if
message_queue.len() > buffer
, the sender is made unparked and pushed toparked_queue
.
- It always waits for
You can easily check the invariant is maintained in the algorithm above.
Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked
state. Even if the sending task is dropped before completion, it just remains in parked_queue
for a while and doesn't block anything. How clever it is!
Further investigation on the futures
source code solved my problem. At last, I cannot confuse the mpsc in this way.
The point is that, the size of mpsc
is flexible and can grow more than initially specified. This behavior is mentioned in the docs:
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.
Problem with fixed buffer
Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:
- When the queue is empty, receivers block.
- When the queue is full (that is, the size is hitting the bound), senders block.
In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).
In multithread programming, this is accomplished by primitives like notify_one
. However, in futures
, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select!
or Deadline
) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).
mpsc
is flexible
As pointed out above, the buffer size for futures::channel::mpsc::channel
isn't strict. The spec is summarized as:
- When
message_queue.len() == 0
, receivers block. - When
message_queue.len() >= buffer
, senders may block. - When
message_queue.len() >= buffer + num_senders
, senders block.
Here, num_senders
is basically the number of clones of Sender
, but more than that in some cases. More precisely, num_senders
is the number of SenderTask
s.
So, how do we avoid resource sharing? We have additional states for that:
- Each sender (an instance of
SenderTask
) hasis_parked
boolean state. - The channel has another queue called
parked_queue
, a queue ofArc
references toSenderTask
.
The channel maintains the following invariants:
message_queue.len() <= buffer + num_parked_senders
. Note that we don't know the value ofnum_parked_senders
.parked_queue.len() == min(0, message_queue.len() - buffer)
- Each parked sender has at least one message in
parked_queue
.
This is accomplished by the following algorithm:
- For receiving,
- it pops off a
SenderTask
fromparked_queue
and, if the sender is parked, unpark it.
- it pops off a
- For sending,
- It always waits for
is_parked
to befalse
. Ifmessage_queue.len() < buffer
, asparked_queue.len() == 0
, all senders are unparked. Therefore we can guarantee progress in this case. - If
is_parked
isfalse
, push the message to the queue anyway. - After that, if
message_queue.len() <= buffer
, it needs to do nothing further. - if
message_queue.len() > buffer
, the sender is made unparked and pushed toparked_queue
.
- It always waits for
You can easily check the invariant is maintained in the algorithm above.
Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked
state. Even if the sending task is dropped before completion, it just remains in parked_queue
for a while and doesn't block anything. How clever it is!
answered Nov 17 at 14:55
Masaki Hara
1,458714
1,458714
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245906%2fwhy-futureschannelmpsc-can-just-notify-one-sender%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown