Why `futures::channel::mpsc` can just notify one sender?











up vote
1
down vote

favorite
1












I'm reading futures-preview 0.3 sources to find out how to do "notify any" correctly. In mpsc::channel (which is bounded), multiple senders may wait for a receipt (in case of full buffer).



Looking into the implementation of next_message and unpark_one, the receiver seems to only notify one sender per one receipt.



I doubt this works in presense of select!, because select! may lead to false notification. However, I couldn't produce a problem case.



Here's my attempt to confuse mpsc:



[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"


and this:



#![feature(async_await, await_macro, futures_api, pin)]

use std::collections::HashSet;

use futures::prelude::*;

use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;

async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;

let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();

// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}

// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}

// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}

// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}

// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}

fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}


I expect this to happen:




  1. The buffer is filled by -1. Therefore later senders are blocked.

  2. There are both "true waiters" and "false waiters".
    False waiters already exited, because the other arm of select!
    immediately completes.

  3. In each call to await!(recv.next()), at most one waiting sender is
    notified. If a false waiter is notified, no one can push to the buffer,
    even if the buffer has a vacant room.

  4. If all elements are drained without true notification,
    the entire system is stuck.


Despite my expectation, the main2 async function successfully completed. Why?










share|improve this question


























    up vote
    1
    down vote

    favorite
    1












    I'm reading futures-preview 0.3 sources to find out how to do "notify any" correctly. In mpsc::channel (which is bounded), multiple senders may wait for a receipt (in case of full buffer).



    Looking into the implementation of next_message and unpark_one, the receiver seems to only notify one sender per one receipt.



    I doubt this works in presense of select!, because select! may lead to false notification. However, I couldn't produce a problem case.



    Here's my attempt to confuse mpsc:



    [package]
    name = "futures-mpsc-test"
    version = "0.1.0"
    edition = "2018"

    [dependencies]
    futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
    tokio = "0.1.11"


    and this:



    #![feature(async_await, await_macro, futures_api, pin)]

    use std::collections::HashSet;

    use futures::prelude::*;

    use futures::channel::mpsc::{channel, Sender};
    use futures::channel::oneshot;
    use futures::select;

    async fn main2() {
    let channel_len = 1;
    let num_false_wait = 1000;
    let num_expected_messages = 100;

    let (mut send, mut recv) = channel(channel_len);
    // One extra capacity per sender. Fill the extras.
    await!(send.send(-2)).unwrap();

    // Fill buffers
    for _ in 0..channel_len {
    await!(send.send(-1)).unwrap();
    }

    // False waits. Should resolve and produce false waiters.
    for _ in 0..num_false_wait {
    await!(false_wait(&send));
    }

    // True messages.
    {
    let mut send = send.clone();
    await!(send.send(-2)).unwrap();
    tokio::spawn(async move {
    for i in 0..num_expected_messages {
    await!(send.send(i)).unwrap();
    }
    Ok(())
    }.boxed().compat());
    }

    // Drain receiver until all true messages are received.
    let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
    while !expects.is_empty() {
    let i = await!(recv.next()).unwrap();
    expects.remove(&i);
    eprintln!("Received: {}", i);
    }
    }

    // If `send` is full, it will produce false waits.
    async fn false_wait(send: &Sender<i32>) {
    let (wait_send, wait_recv) = oneshot::channel();
    let mut send = send.clone();
    await!(send.send(-2)).unwrap();
    tokio::spawn(async move {
    let mut sending = send.send(-3);
    let mut fallback = future::ready(());
    select! {
    sending => {
    sending.unwrap();
    },
    fallback => {
    eprintln!("future::ready is selected");
    },
    };
    wait_send.send(()).unwrap();
    Ok(())
    }.boxed().compat());
    await!(wait_recv).unwrap();
    }

    fn main() {
    tokio::run(async {
    await!(main2());
    Ok(())
    }.boxed().compat());
    }


    I expect this to happen:




    1. The buffer is filled by -1. Therefore later senders are blocked.

    2. There are both "true waiters" and "false waiters".
      False waiters already exited, because the other arm of select!
      immediately completes.

    3. In each call to await!(recv.next()), at most one waiting sender is
      notified. If a false waiter is notified, no one can push to the buffer,
      even if the buffer has a vacant room.

    4. If all elements are drained without true notification,
      the entire system is stuck.


    Despite my expectation, the main2 async function successfully completed. Why?










    share|improve this question
























      up vote
      1
      down vote

      favorite
      1









      up vote
      1
      down vote

      favorite
      1






      1





      I'm reading futures-preview 0.3 sources to find out how to do "notify any" correctly. In mpsc::channel (which is bounded), multiple senders may wait for a receipt (in case of full buffer).



      Looking into the implementation of next_message and unpark_one, the receiver seems to only notify one sender per one receipt.



      I doubt this works in presense of select!, because select! may lead to false notification. However, I couldn't produce a problem case.



      Here's my attempt to confuse mpsc:



      [package]
      name = "futures-mpsc-test"
      version = "0.1.0"
      edition = "2018"

      [dependencies]
      futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
      tokio = "0.1.11"


      and this:



      #![feature(async_await, await_macro, futures_api, pin)]

      use std::collections::HashSet;

      use futures::prelude::*;

      use futures::channel::mpsc::{channel, Sender};
      use futures::channel::oneshot;
      use futures::select;

      async fn main2() {
      let channel_len = 1;
      let num_false_wait = 1000;
      let num_expected_messages = 100;

      let (mut send, mut recv) = channel(channel_len);
      // One extra capacity per sender. Fill the extras.
      await!(send.send(-2)).unwrap();

      // Fill buffers
      for _ in 0..channel_len {
      await!(send.send(-1)).unwrap();
      }

      // False waits. Should resolve and produce false waiters.
      for _ in 0..num_false_wait {
      await!(false_wait(&send));
      }

      // True messages.
      {
      let mut send = send.clone();
      await!(send.send(-2)).unwrap();
      tokio::spawn(async move {
      for i in 0..num_expected_messages {
      await!(send.send(i)).unwrap();
      }
      Ok(())
      }.boxed().compat());
      }

      // Drain receiver until all true messages are received.
      let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
      while !expects.is_empty() {
      let i = await!(recv.next()).unwrap();
      expects.remove(&i);
      eprintln!("Received: {}", i);
      }
      }

      // If `send` is full, it will produce false waits.
      async fn false_wait(send: &Sender<i32>) {
      let (wait_send, wait_recv) = oneshot::channel();
      let mut send = send.clone();
      await!(send.send(-2)).unwrap();
      tokio::spawn(async move {
      let mut sending = send.send(-3);
      let mut fallback = future::ready(());
      select! {
      sending => {
      sending.unwrap();
      },
      fallback => {
      eprintln!("future::ready is selected");
      },
      };
      wait_send.send(()).unwrap();
      Ok(())
      }.boxed().compat());
      await!(wait_recv).unwrap();
      }

      fn main() {
      tokio::run(async {
      await!(main2());
      Ok(())
      }.boxed().compat());
      }


      I expect this to happen:




      1. The buffer is filled by -1. Therefore later senders are blocked.

      2. There are both "true waiters" and "false waiters".
        False waiters already exited, because the other arm of select!
        immediately completes.

      3. In each call to await!(recv.next()), at most one waiting sender is
        notified. If a false waiter is notified, no one can push to the buffer,
        even if the buffer has a vacant room.

      4. If all elements are drained without true notification,
        the entire system is stuck.


      Despite my expectation, the main2 async function successfully completed. Why?










      share|improve this question













      I'm reading futures-preview 0.3 sources to find out how to do "notify any" correctly. In mpsc::channel (which is bounded), multiple senders may wait for a receipt (in case of full buffer).



      Looking into the implementation of next_message and unpark_one, the receiver seems to only notify one sender per one receipt.



      I doubt this works in presense of select!, because select! may lead to false notification. However, I couldn't produce a problem case.



      Here's my attempt to confuse mpsc:



      [package]
      name = "futures-mpsc-test"
      version = "0.1.0"
      edition = "2018"

      [dependencies]
      futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
      tokio = "0.1.11"


      and this:



      #![feature(async_await, await_macro, futures_api, pin)]

      use std::collections::HashSet;

      use futures::prelude::*;

      use futures::channel::mpsc::{channel, Sender};
      use futures::channel::oneshot;
      use futures::select;

      async fn main2() {
      let channel_len = 1;
      let num_false_wait = 1000;
      let num_expected_messages = 100;

      let (mut send, mut recv) = channel(channel_len);
      // One extra capacity per sender. Fill the extras.
      await!(send.send(-2)).unwrap();

      // Fill buffers
      for _ in 0..channel_len {
      await!(send.send(-1)).unwrap();
      }

      // False waits. Should resolve and produce false waiters.
      for _ in 0..num_false_wait {
      await!(false_wait(&send));
      }

      // True messages.
      {
      let mut send = send.clone();
      await!(send.send(-2)).unwrap();
      tokio::spawn(async move {
      for i in 0..num_expected_messages {
      await!(send.send(i)).unwrap();
      }
      Ok(())
      }.boxed().compat());
      }

      // Drain receiver until all true messages are received.
      let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
      while !expects.is_empty() {
      let i = await!(recv.next()).unwrap();
      expects.remove(&i);
      eprintln!("Received: {}", i);
      }
      }

      // If `send` is full, it will produce false waits.
      async fn false_wait(send: &Sender<i32>) {
      let (wait_send, wait_recv) = oneshot::channel();
      let mut send = send.clone();
      await!(send.send(-2)).unwrap();
      tokio::spawn(async move {
      let mut sending = send.send(-3);
      let mut fallback = future::ready(());
      select! {
      sending => {
      sending.unwrap();
      },
      fallback => {
      eprintln!("future::ready is selected");
      },
      };
      wait_send.send(()).unwrap();
      Ok(())
      }.boxed().compat());
      await!(wait_recv).unwrap();
      }

      fn main() {
      tokio::run(async {
      await!(main2());
      Ok(())
      }.boxed().compat());
      }


      I expect this to happen:




      1. The buffer is filled by -1. Therefore later senders are blocked.

      2. There are both "true waiters" and "false waiters".
        False waiters already exited, because the other arm of select!
        immediately completes.

      3. In each call to await!(recv.next()), at most one waiting sender is
        notified. If a false waiter is notified, no one can push to the buffer,
        even if the buffer has a vacant room.

      4. If all elements are drained without true notification,
        the entire system is stuck.


      Despite my expectation, the main2 async function successfully completed. Why?







      concurrency rust rust-tokio






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 11 at 4:40









      Masaki Hara

      1,458714




      1,458714
























          2 Answers
          2






          active

          oldest

          votes

















          up vote
          1
          down vote














          I doubt this works in presense of select!, because select! may lead to false notification.




          No, You can't "confuse" a mpsc channel using select!:



          select! does not trigger any mspc related notification, it just return the future that finishes first.



          When the message queue is full it is await!(recv.next()) that notifies one producer that a slot into the bounded channel is now available.



          In other words: there are no true waiters and false waiters:
          when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.






          share|improve this answer





















          • Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
            – Masaki Hara
            Nov 12 at 12:19










          • It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is Receiver::unpark_one that notifies one sender in the queue of parked senders that a slot in the buffer is available.
            – attdona
            Nov 12 at 14:11










          • I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls poll that will eventually reach the sender future. But is this guaranteed that the poll reaches the leaf future we intend? I thought not, because if one arm of select! completes (ready in my case), the select! will ignore other futures (send in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
            – Masaki Hara
            Nov 12 at 14:47










          • Future don't wait for something, they are to be polled: when select! complete the sending future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3 is not sent through the channel.
            – attdona
            Nov 13 at 10:00










          • Yes, then my concern is that, after that, the buffer may be vacant (because -3 is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
            – Masaki Hara
            Nov 13 at 13:42


















          up vote
          0
          down vote













          Further investigation on the futures source code solved my problem. At last, I cannot confuse the mpsc in this way.



          The point is that, the size of mpsc is flexible and can grow more than initially specified. This behavior is mentioned in the docs:




          The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.




          Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.



          Problem with fixed buffer



          Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:




          • When the queue is empty, receivers block.

          • When the queue is full (that is, the size is hitting the bound), senders block.


          In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).



          In multithread programming, this is accomplished by primitives like notify_one. However, in futures, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select! or Deadline) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).




          mpsc is flexible



          As pointed out above, the buffer size for futures::channel::mpsc::channel isn't strict. The spec is summarized as:




          • When message_queue.len() == 0, receivers block.

          • When message_queue.len() >= buffer, senders may block.

          • When message_queue.len() >= buffer + num_senders, senders block.


          Here, num_senders is basically the number of clones of Sender, but more than that in some cases. More precisely, num_senders is the number of SenderTasks.



          So, how do we avoid resource sharing? We have additional states for that:




          • Each sender (an instance of SenderTask) has is_parked boolean state.

          • The channel has another queue called parked_queue, a queue of Arc references to SenderTask.


          The channel maintains the following invariants:





          • message_queue.len() <= buffer + num_parked_senders. Note that we don't know the value of num_parked_senders.

          • parked_queue.len() == min(0, message_queue.len() - buffer)

          • Each parked sender has at least one message in parked_queue.


          This is accomplished by the following algorithm:




          • For receiving,


            • it pops off a SenderTask from parked_queue and, if the sender is parked, unpark it.



          • For sending,


            • It always waits for is_parked to be false. If message_queue.len() < buffer, as parked_queue.len() == 0, all senders are unparked. Therefore we can guarantee progress in this case.

            • If is_parked is false, push the message to the queue anyway.

            • After that, if message_queue.len() <= buffer, it needs to do nothing further.

            • if message_queue.len() > buffer, the sender is made unparked and pushed to parked_queue.




          You can easily check the invariant is maintained in the algorithm above.



          Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked state. Even if the sending task is dropped before completion, it just remains in parked_queue for a while and doesn't block anything. How clever it is!






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














             

            draft saved


            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245906%2fwhy-futureschannelmpsc-can-just-notify-one-sender%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            1
            down vote














            I doubt this works in presense of select!, because select! may lead to false notification.




            No, You can't "confuse" a mpsc channel using select!:



            select! does not trigger any mspc related notification, it just return the future that finishes first.



            When the message queue is full it is await!(recv.next()) that notifies one producer that a slot into the bounded channel is now available.



            In other words: there are no true waiters and false waiters:
            when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.






            share|improve this answer





















            • Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
              – Masaki Hara
              Nov 12 at 12:19










            • It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is Receiver::unpark_one that notifies one sender in the queue of parked senders that a slot in the buffer is available.
              – attdona
              Nov 12 at 14:11










            • I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls poll that will eventually reach the sender future. But is this guaranteed that the poll reaches the leaf future we intend? I thought not, because if one arm of select! completes (ready in my case), the select! will ignore other futures (send in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
              – Masaki Hara
              Nov 12 at 14:47










            • Future don't wait for something, they are to be polled: when select! complete the sending future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3 is not sent through the channel.
              – attdona
              Nov 13 at 10:00










            • Yes, then my concern is that, after that, the buffer may be vacant (because -3 is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
              – Masaki Hara
              Nov 13 at 13:42















            up vote
            1
            down vote














            I doubt this works in presense of select!, because select! may lead to false notification.




            No, You can't "confuse" a mpsc channel using select!:



            select! does not trigger any mspc related notification, it just return the future that finishes first.



            When the message queue is full it is await!(recv.next()) that notifies one producer that a slot into the bounded channel is now available.



            In other words: there are no true waiters and false waiters:
            when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.






            share|improve this answer





















            • Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
              – Masaki Hara
              Nov 12 at 12:19










            • It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is Receiver::unpark_one that notifies one sender in the queue of parked senders that a slot in the buffer is available.
              – attdona
              Nov 12 at 14:11










            • I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls poll that will eventually reach the sender future. But is this guaranteed that the poll reaches the leaf future we intend? I thought not, because if one arm of select! completes (ready in my case), the select! will ignore other futures (send in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
              – Masaki Hara
              Nov 12 at 14:47










            • Future don't wait for something, they are to be polled: when select! complete the sending future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3 is not sent through the channel.
              – attdona
              Nov 13 at 10:00










            • Yes, then my concern is that, after that, the buffer may be vacant (because -3 is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
              – Masaki Hara
              Nov 13 at 13:42













            up vote
            1
            down vote










            up vote
            1
            down vote










            I doubt this works in presense of select!, because select! may lead to false notification.




            No, You can't "confuse" a mpsc channel using select!:



            select! does not trigger any mspc related notification, it just return the future that finishes first.



            When the message queue is full it is await!(recv.next()) that notifies one producer that a slot into the bounded channel is now available.



            In other words: there are no true waiters and false waiters:
            when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.






            share|improve this answer













            I doubt this works in presense of select!, because select! may lead to false notification.




            No, You can't "confuse" a mpsc channel using select!:



            select! does not trigger any mspc related notification, it just return the future that finishes first.



            When the message queue is full it is await!(recv.next()) that notifies one producer that a slot into the bounded channel is now available.



            In other words: there are no true waiters and false waiters:
            when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Nov 12 at 9:13









            attdona

            2,79911118




            2,79911118












            • Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
              – Masaki Hara
              Nov 12 at 12:19










            • It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is Receiver::unpark_one that notifies one sender in the queue of parked senders that a slot in the buffer is available.
              – attdona
              Nov 12 at 14:11










            • I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls poll that will eventually reach the sender future. But is this guaranteed that the poll reaches the leaf future we intend? I thought not, because if one arm of select! completes (ready in my case), the select! will ignore other futures (send in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
              – Masaki Hara
              Nov 12 at 14:47










            • Future don't wait for something, they are to be polled: when select! complete the sending future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3 is not sent through the channel.
              – attdona
              Nov 13 at 10:00










            • Yes, then my concern is that, after that, the buffer may be vacant (because -3 is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
              – Masaki Hara
              Nov 13 at 13:42


















            • Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
              – Masaki Hara
              Nov 12 at 12:19










            • It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is Receiver::unpark_one that notifies one sender in the queue of parked senders that a slot in the buffer is available.
              – attdona
              Nov 12 at 14:11










            • I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls poll that will eventually reach the sender future. But is this guaranteed that the poll reaches the leaf future we intend? I thought not, because if one arm of select! completes (ready in my case), the select! will ignore other futures (send in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
              – Masaki Hara
              Nov 12 at 14:47










            • Future don't wait for something, they are to be polled: when select! complete the sending future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3 is not sent through the channel.
              – attdona
              Nov 13 at 10:00










            • Yes, then my concern is that, after that, the buffer may be vacant (because -3 is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
              – Masaki Hara
              Nov 13 at 13:42
















            Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
            – Masaki Hara
            Nov 12 at 12:19




            Thank you for the answer, but I'm not convinced yet. Let me reword my assumption using "future" and "task". The sender future awaits for receiver notification if the buffer is full at the time of the sending attempt. However, notification is sent against the task, not the sender future. Usually the notification is propagated down to the sender future, but that's not always the case in presence of selection; the whole task may have even been dropped without seeing notification. Then no alive sender future can wake despite non-full buffer. There may be something wrong but I'm not sure where.
            – Masaki Hara
            Nov 12 at 12:19












            It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is Receiver::unpark_one that notifies one sender in the queue of parked senders that a slot in the buffer is available.
            – attdona
            Nov 12 at 14:11




            It is the receiver that notify, not the sender. It is a "sending" future that you select from: this means that there are no notifications from select in every case. It is Receiver::unpark_one that notifies one sender in the queue of parked senders that a slot in the buffer is available.
            – attdona
            Nov 12 at 14:11












            I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls poll that will eventually reach the sender future. But is this guaranteed that the poll reaches the leaf future we intend? I thought not, because if one arm of select! completes (ready in my case), the select! will ignore other futures (send in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
            – Masaki Hara
            Nov 12 at 14:47




            I know. The receiver notifies a sender (in case of the full buffer). However, if I understand correctly, the notification goes to the sender task, not directly to the sender future. Then the sender task recursively calls poll that will eventually reach the sender future. But is this guaranteed that the poll reaches the leaf future we intend? I thought not, because if one arm of select! completes (ready in my case), the select! will ignore other futures (send in my case) even if they're waiting for something. If so, what if the notification disappears there? That's my question.
            – Masaki Hara
            Nov 12 at 14:47












            Future don't wait for something, they are to be polled: when select! complete the sending future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3 is not sent through the channel.
            – attdona
            Nov 13 at 10:00




            Future don't wait for something, they are to be polled: when select! complete the sending future is left in an incomplete state and in your code no one will poll it again for resolving it. the only side effect that I see it is that -3 is not sent through the channel.
            – attdona
            Nov 13 at 10:00












            Yes, then my concern is that, after that, the buffer may be vacant (because -3 is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
            – Masaki Hara
            Nov 13 at 13:42




            Yes, then my concern is that, after that, the buffer may be vacant (because -3 is not sent) but no one sends an item against the channel, because other sending tasks are not notified. What prevents this bad (at least counterintuitive) situation?
            – Masaki Hara
            Nov 13 at 13:42












            up vote
            0
            down vote













            Further investigation on the futures source code solved my problem. At last, I cannot confuse the mpsc in this way.



            The point is that, the size of mpsc is flexible and can grow more than initially specified. This behavior is mentioned in the docs:




            The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.




            Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.



            Problem with fixed buffer



            Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:




            • When the queue is empty, receivers block.

            • When the queue is full (that is, the size is hitting the bound), senders block.


            In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).



            In multithread programming, this is accomplished by primitives like notify_one. However, in futures, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select! or Deadline) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).




            mpsc is flexible



            As pointed out above, the buffer size for futures::channel::mpsc::channel isn't strict. The spec is summarized as:




            • When message_queue.len() == 0, receivers block.

            • When message_queue.len() >= buffer, senders may block.

            • When message_queue.len() >= buffer + num_senders, senders block.


            Here, num_senders is basically the number of clones of Sender, but more than that in some cases. More precisely, num_senders is the number of SenderTasks.



            So, how do we avoid resource sharing? We have additional states for that:




            • Each sender (an instance of SenderTask) has is_parked boolean state.

            • The channel has another queue called parked_queue, a queue of Arc references to SenderTask.


            The channel maintains the following invariants:





            • message_queue.len() <= buffer + num_parked_senders. Note that we don't know the value of num_parked_senders.

            • parked_queue.len() == min(0, message_queue.len() - buffer)

            • Each parked sender has at least one message in parked_queue.


            This is accomplished by the following algorithm:




            • For receiving,


              • it pops off a SenderTask from parked_queue and, if the sender is parked, unpark it.



            • For sending,


              • It always waits for is_parked to be false. If message_queue.len() < buffer, as parked_queue.len() == 0, all senders are unparked. Therefore we can guarantee progress in this case.

              • If is_parked is false, push the message to the queue anyway.

              • After that, if message_queue.len() <= buffer, it needs to do nothing further.

              • if message_queue.len() > buffer, the sender is made unparked and pushed to parked_queue.




            You can easily check the invariant is maintained in the algorithm above.



            Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked state. Even if the sending task is dropped before completion, it just remains in parked_queue for a while and doesn't block anything. How clever it is!






            share|improve this answer

























              up vote
              0
              down vote













              Further investigation on the futures source code solved my problem. At last, I cannot confuse the mpsc in this way.



              The point is that, the size of mpsc is flexible and can grow more than initially specified. This behavior is mentioned in the docs:




              The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.




              Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.



              Problem with fixed buffer



              Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:




              • When the queue is empty, receivers block.

              • When the queue is full (that is, the size is hitting the bound), senders block.


              In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).



              In multithread programming, this is accomplished by primitives like notify_one. However, in futures, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select! or Deadline) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).




              mpsc is flexible



              As pointed out above, the buffer size for futures::channel::mpsc::channel isn't strict. The spec is summarized as:




              • When message_queue.len() == 0, receivers block.

              • When message_queue.len() >= buffer, senders may block.

              • When message_queue.len() >= buffer + num_senders, senders block.


              Here, num_senders is basically the number of clones of Sender, but more than that in some cases. More precisely, num_senders is the number of SenderTasks.



              So, how do we avoid resource sharing? We have additional states for that:




              • Each sender (an instance of SenderTask) has is_parked boolean state.

              • The channel has another queue called parked_queue, a queue of Arc references to SenderTask.


              The channel maintains the following invariants:





              • message_queue.len() <= buffer + num_parked_senders. Note that we don't know the value of num_parked_senders.

              • parked_queue.len() == min(0, message_queue.len() - buffer)

              • Each parked sender has at least one message in parked_queue.


              This is accomplished by the following algorithm:




              • For receiving,


                • it pops off a SenderTask from parked_queue and, if the sender is parked, unpark it.



              • For sending,


                • It always waits for is_parked to be false. If message_queue.len() < buffer, as parked_queue.len() == 0, all senders are unparked. Therefore we can guarantee progress in this case.

                • If is_parked is false, push the message to the queue anyway.

                • After that, if message_queue.len() <= buffer, it needs to do nothing further.

                • if message_queue.len() > buffer, the sender is made unparked and pushed to parked_queue.




              You can easily check the invariant is maintained in the algorithm above.



              Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked state. Even if the sending task is dropped before completion, it just remains in parked_queue for a while and doesn't block anything. How clever it is!






              share|improve this answer























                up vote
                0
                down vote










                up vote
                0
                down vote









                Further investigation on the futures source code solved my problem. At last, I cannot confuse the mpsc in this way.



                The point is that, the size of mpsc is flexible and can grow more than initially specified. This behavior is mentioned in the docs:




                The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.




                Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.



                Problem with fixed buffer



                Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:




                • When the queue is empty, receivers block.

                • When the queue is full (that is, the size is hitting the bound), senders block.


                In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).



                In multithread programming, this is accomplished by primitives like notify_one. However, in futures, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select! or Deadline) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).




                mpsc is flexible



                As pointed out above, the buffer size for futures::channel::mpsc::channel isn't strict. The spec is summarized as:




                • When message_queue.len() == 0, receivers block.

                • When message_queue.len() >= buffer, senders may block.

                • When message_queue.len() >= buffer + num_senders, senders block.


                Here, num_senders is basically the number of clones of Sender, but more than that in some cases. More precisely, num_senders is the number of SenderTasks.



                So, how do we avoid resource sharing? We have additional states for that:




                • Each sender (an instance of SenderTask) has is_parked boolean state.

                • The channel has another queue called parked_queue, a queue of Arc references to SenderTask.


                The channel maintains the following invariants:





                • message_queue.len() <= buffer + num_parked_senders. Note that we don't know the value of num_parked_senders.

                • parked_queue.len() == min(0, message_queue.len() - buffer)

                • Each parked sender has at least one message in parked_queue.


                This is accomplished by the following algorithm:




                • For receiving,


                  • it pops off a SenderTask from parked_queue and, if the sender is parked, unpark it.



                • For sending,


                  • It always waits for is_parked to be false. If message_queue.len() < buffer, as parked_queue.len() == 0, all senders are unparked. Therefore we can guarantee progress in this case.

                  • If is_parked is false, push the message to the queue anyway.

                  • After that, if message_queue.len() <= buffer, it needs to do nothing further.

                  • if message_queue.len() > buffer, the sender is made unparked and pushed to parked_queue.




                You can easily check the invariant is maintained in the algorithm above.



                Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked state. Even if the sending task is dropped before completion, it just remains in parked_queue for a while and doesn't block anything. How clever it is!






                share|improve this answer












                Further investigation on the futures source code solved my problem. At last, I cannot confuse the mpsc in this way.



                The point is that, the size of mpsc is flexible and can grow more than initially specified. This behavior is mentioned in the docs:




                The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.




                Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.



                Problem with fixed buffer



                Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:




                • When the queue is empty, receivers block.

                • When the queue is full (that is, the size is hitting the bound), senders block.


                In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).



                In multithread programming, this is accomplished by primitives like notify_one. However, in futures, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select! or Deadline) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).




                mpsc is flexible



                As pointed out above, the buffer size for futures::channel::mpsc::channel isn't strict. The spec is summarized as:




                • When message_queue.len() == 0, receivers block.

                • When message_queue.len() >= buffer, senders may block.

                • When message_queue.len() >= buffer + num_senders, senders block.


                Here, num_senders is basically the number of clones of Sender, but more than that in some cases. More precisely, num_senders is the number of SenderTasks.



                So, how do we avoid resource sharing? We have additional states for that:




                • Each sender (an instance of SenderTask) has is_parked boolean state.

                • The channel has another queue called parked_queue, a queue of Arc references to SenderTask.


                The channel maintains the following invariants:





                • message_queue.len() <= buffer + num_parked_senders. Note that we don't know the value of num_parked_senders.

                • parked_queue.len() == min(0, message_queue.len() - buffer)

                • Each parked sender has at least one message in parked_queue.


                This is accomplished by the following algorithm:




                • For receiving,


                  • it pops off a SenderTask from parked_queue and, if the sender is parked, unpark it.



                • For sending,


                  • It always waits for is_parked to be false. If message_queue.len() < buffer, as parked_queue.len() == 0, all senders are unparked. Therefore we can guarantee progress in this case.

                  • If is_parked is false, push the message to the queue anyway.

                  • After that, if message_queue.len() <= buffer, it needs to do nothing further.

                  • if message_queue.len() > buffer, the sender is made unparked and pushed to parked_queue.




                You can easily check the invariant is maintained in the algorithm above.



                Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked state. Even if the sending task is dropped before completion, it just remains in parked_queue for a while and doesn't block anything. How clever it is!







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 17 at 14:55









                Masaki Hara

                1,458714




                1,458714






























                     

                    draft saved


                    draft discarded



















































                     


                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245906%2fwhy-futureschannelmpsc-can-just-notify-one-sender%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Full-time equivalent

                    さくらももこ

                    13 indicted, 8 arrested in Calif. drug cartel investigation