adds async stream rfc by nellshamrell · Pull Request #2996 · rust-lang/rfcs (original) (raw)
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
[ Show hidden characters]({{ revealButtonHref }})
gliderkite, attila-lin, rhysd, yoshuawuyts, GrayJack, tmccombs, yerke, dignifiedquire, KodrAus, tema3210, and 22 more reacted with thumbs up emoji nazar-pc, attila-lin, sdroege, shekohex, rhysd, Aaron1011, joshtriplett, Nashenas88, mark-i-m, Connicpu, and 31 more reacted with hooray emoji stepancheg reacted with confused emoji
Signed-off-by: Nell Shamrell nellshamrell@gmail.com
Co-authored-by: Florian Gilcher florian.gilcher@asquera.de
Co-authored-by: Florian Gilcher florian.gilcher@asquera.de
Co-authored-by: Waffle Lapkin waffle.lapkin@gmail.com
Co-authored-by: kennytm kennytm@gmail.com
Co-authored-by: Taiki Endo te316e89@gmail.com
Signed-off-by: Nell Shamrell nellshamrell@gmail.com
We discussed this amongst the lang team at some point and concluded that this does not require @rust-lang/lang signoff. I've tagged it for @rust-lang/libs.
Also cc @rust-lang/wg-async-foundations
👍 to adding try_next() if possible.
Also: could we please standardize a method for converting from Iterator to Stream? That may not be able to use IntoStream (because a blanket impl for Iterator would conflict with more specific impls people may want to write), but having a function that takes an impl Iterator<Item = T> and returns an impl Stream<Item = T> would be really helpful.
Signed-off-by: Nell Shamrell nellshamrell@gmail.com
Signed-off-by: Nell Shamrell nellshamrell@gmail.com
👍 to adding
try_next()if possible.
@joshtriplett The conversation around whether to add Stream::next took quite a while to resolve, and was only added because poll_next by itself is not useful for end-users. The proposed try_next adapter has no counterpart in Iterator, is a minor helper to avoid writing .transpose(), and doesn't include functionality unique to async Rust. I would prefer if the scope of this PR would not be expanded beyond what's already been proposed so we don't need to find consensus on the design of further stream adapters in order to stabilize Stream.
Also: could we please standardize a method for converting from Iterator to Stream?
@joshtriplett async-std has stream::from_iter and futures-rs has stream::iter. I don't know if you think this should be mentioned in the RFC, but adding a function with this functionality would be straightforward once we expose Stream.
@yoshuawuyts To clarify, neither of those comments was intended as a blocker to this RFC. I think it makes sense to add a minimal implementation of Stream, and after doing so, add additional helper functions like these to make it easier to work with.
Information about converting an iterator to a stream has been added :)
I finally got around to reading, this looks pretty good, I think including next is +1. I would really really like to see try_next, I think this is by far the most useful fn I have used for streams. I don't really understand the reason to not include it? I don't think there is much consensus needed for it, we've already agreed how to do next and try_next shouldn't be much more complicated as far as I can tell.
Another example of !Unpin streams are done via async-stream crate https://github.com/tokio-rs/async-stream/ which is usable on stable today.
Thanks a ton @nellshamrell and other for pushing this through 😄
I'm not sure where to comment on this, but given that this RFC discusses LendStream, as a generalization, in theory at least there is a further generalization involving different lifetimes for the Lender, and the Item, with a lifetime bounds that 'lender: 'item. Which probably requires additional control mechanisms to regain ownership in the Lender.
With the idea that the single liftime LendingStream is a specialization of it where the lifetimes are the same.
I haven't thought enough about the extent to which the further generalization can be expressed currently though.
It seemed since the discussion of LendingStream as a generalization of Stream up it might/might not be worth thinking about this further generalization
l4l left a comment
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor typos, please check it out, I'm not a native speaker there might be mistakes.
That's an amazing job by the way, really like reading the rfc. Hope it would be approved soon.
I would really really like to see
try_next[...] I don't really understand the reason to not include it?
@LucioFranco I covered that earlier in the thread #2996 (comment):
The proposed
try_nextadapter has no counterpart in Iterator, is a minor helper to avoid writing.transpose(), and doesn't include functionality unique to async Rust. I would prefer if the scope of this PR would not be expanded beyond what's already been proposed so we don't need to find consensus on the design of further stream adapters in order to stabilizeStream.
I see try_next as being desirable only as long as we don't have async iteration syntax, and will fall out of fashion the moment we do. I know you'll see this differently, and discussing the scope and priorities of stream adapters will make for an interesting conversation. But do you really want to push us to find consensus on that now?
We're both part of the Async Foundations WG, and we've had the privilege of being able to set the direction of this RFC. And that was a success; we have consensus on everything included here! I think now is the time to ensure what's in this RFC is accurate, but not seek to expand its scope with subjects we then newly need to find consensus on.
To ensure I understand, the role of try_next is to allow you to write code like this?
while let Some(foo) = stream.try_next().await? { }
as opposed to this?
while let Some(foo) = stream.try_next().await { let foo = foo?; }
Hmm, I'm torn. I agree with @yoshuawuyts that we should avoid increasing the scope of the RFC in general, which is intentionally targeted. But I also think that the point of including next was that it was something people would "immediately want" in practice, and I can imagine that try_next also fits in that bucket. I'm not sure how important it is that it would become less useful if/when async iteration syntax is added, given that we don't have a clear timeline on that. Is the primary objection that the method will not be as useful in the future or are there other concerns?
I would want to hold the line against other forms of "scope creep", the only reason I can see to consider try_next is that it feels very analogous to next.
Thinking on it more, I think what I'd prefer is to leave the RFC unaltered, land it, and consider try_next separately as a PR. It feels like a fairly standard "libs addition". The only difference is that, if we plan to make the futures crate redirect to core, before we actually do that we will need to have a clear idea of what methods we have, because any additional methods will have to be carefully coordinated.
@yoshuawuyts sure, I am pretty sure I voiced my opinion during that period. It's also my fault for not pushing it harder, I got busy and that happens. I still don't follow your argument you referenced with iterators. I think Streams are not 1:1 with iterators but I rest my case.
@nikomatsakis right, I think most streams I have worked with return some sort of result so avoiding the first example in favor of the second is much nicer.
I think its fine to punt on this for now, just knowing that I think this is extremely useful. My question then is, what does that path look like to add try_next and what does that look like in conjunction with the futures crate? For example, I want to use this try_next and when/if we add it to std will it just replace the one on the futures crate? To me I see this method as one of the foundational ones for this trait and I think that path needs to be clear.
The alternative to try_next is
while let Some(foo) = stream.next().await.transpose()? { }
which is a pattern I've started using with fallible iterators as well, having to use a while let instead of a for loop is less annoying than having the let foo = foo?; line inside the loop.
If streams can register themselves once at the beginning of a
for_eachand de-register at the end, the stream may be implemented considerably more efficiently.
for_each takes a closure returning a future which may "block" for as long as it likes, so it would need to pause the stream before running that future and unpause it again afterwards. Similarly as soon as you have a .then(_) in a stream chain that Then stream would have to pre-emptively pause the prior stream in case the future it runs for each item "blocks" for too long. I don't think there are many situations in which you could poll a stream multiple times without pausing it between each poll.
Re: the use of the Stream trait when receiving from channels.
Maybe Stream is simply not the correct abstraction for it. If you need to know (for performance reasons) what receivers are actively attempting to receive a value maybe a short-lived Future like the Stream::next method (now removed) returned would be a better fit.
I realise that Stream could provide some nice syntax sugar similar to what for with Iterators, but the following also works fine.
while let Ok(msg) = channel.receive_next().await { // Handle message... }
impl Channel { fn receive_next(&mut self) -> impl Future<Output = Result<M, RecvError>> { /* ... */ } }
It creates short lived Futures allowing the implementation to know which receivers are actively trying to receive a value.
bdash mentioned this pull request
| consider `next`, which when called, returns a future which yields |
|---|
| `Option`. |
| The future returned by `next` will yield `Some(Item)` as long as there are |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future returned by
nextwill yieldSome(Item)as long as there are elements
How does this play with fuse considering the future exhausted after the first Poll::Ready? Is the idea that you can either rely on that implementation detail for next and continue to pull items using the same future or get a new one after the previous next resolved?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @KodrAus - the answer on this one is I honestly don't know. I'm happy to include a note that this will need to be considered in the eventual RFC and implementation of the next method. Would that be sufficient, since this is in the Future Possibilities section?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found myself wanting to see the contents of the More Usage Examples section here first after we defined our Counter to see what it's like to interact with on the consuming side.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @KodrAus - I intentionally did not include the consuming side here since the next method has been moved to the "Future Possibilities" section. I felt including that here would introduce more confusion about what is and is not covered by this RFC.
@rust-lang/libs I've dropped a link to the FCP in the OP here, since it's gotten quite buried.
Maybe Stream is simply not the correct abstraction for it. If you need to know (for performance reasons) what receivers are actively attempting to receive a value maybe a short-lived Future like the Stream::next method (now removed) returned would be a better fit.
Perhaps this is the case, but I think it would be a shame for Stream to not accommodate this common use-case when the cost of the features required to allow it to accommodate it is so low.
This also means that there is no longer a natural duality between Stream and Iterator as the RFC claims.
Hello @rust-lang/libs :) For those of you who have not yet approved FCP in this comment - are there any concerns you have or is there anything blocking approving this for FCP?
@zesterer thank you for your comments, they encouraged us to think even more carefully about the design of this feature. I am going to leave the design as it is now as a minimal viable implementation, but we have your concerns about channels recorded here for future reference. I have no doubt we will be opening more RFCs around Async streams in the near future and we will certainly keep your concerns in mind. Thank you again, the community input and context is vital to any RFC.
JohnTitor added a commit to JohnTitor/rust that referenced this pull request
Add core::stream::Stream
[Tracking issue: rust-lang#79024]
This patch adds the core::stream submodule and implements core::stream::Stream in accordance with RFC2996. The RFC hasn't been merged yet, but as requested by the libs team in rust-lang/rfcs#2996 (comment) I'm filing this PR to get the ball rolling.
Documentatation
The docs in this PR have been adapted from std::iter, async_std::stream, and futures::stream::Stream. Once this PR lands my plan is to follow this up with PRs to add helper methods such as stream::repeat which can be used to document more of the concepts that are currently missing. That will allow us to cover concepts such as "infinite streams" and "laziness" in more depth.
Feature gate
The feature gate for Stream is stream_trait. This matches the #[lang = "future_trait"] attribute name. The intention is that only the APIs defined in RFC2996 will use this feature gate, with future additions such as stream::repeat using their own feature gates. This is so we can ensure a smooth path towards stabilizing the Stream trait without needing to stabilize all the APIs in core::stream at once. But also don't start expanding the API until after stabilization, as was the case with std::future.
edit: the feature gate has been changed to async_stream to match the feature gate proposed in the RFC.
Conclusion
This PR introduces core::stream::{Stream, Next} and re-exports it from std as std::stream::{Stream, Next}. Landing Stream in the stdlib has been a mult-year process; and it's incredibly exciting for this to finally happen!
r? @KodrAus
cc/ @rust-lang/wg-async-foundations @rust-lang/libs
Gentle ping to @rust-lang/libs - this only needs one more approval for FCP!
🔔 This is now entering its final comment period, as per the review above. 🔔
yoshuawuyts, Lindenk, Johannesd3, tux3, nellshamrell, thedodd, asonix, chertov, sachaarbonel, dnrusakov, and zbraniecki reacted with hooray emoji
The final comment period, with a disposition to merge, as per the review above, is now complete.
As the automated representative of the governance process, I would like to thank the author for their work and everyone else who contributed.
The RFC will be merged soon.
tesaguri, tux3, zesterer, kennytm, zbraniecki, marcelbuesing, benkay86, yoshuawuyts, nellshamrell, tmandry, and 7 more reacted with hooray emoji theduke, yoshuawuyts, nellshamrell, dnrusakov, GeorgeHahn, JunichiSugiura, payload, thedodd, and eboody reacted with rocket emoji
FCP has been complete for a bit over a month now, is something else missing for this to be merged?
No! I thought I did the merge! Maybe I forgot to push?
Hmm, maybe I removed the merge commit or something. I'm not sure why github doesn't show this as merged but .. it is. Going to close!
Looks like a rebase instead of a merge, and GitHub only "understands" command-line PR merging if the exact original commits are included (i.e. if it's a merge or fast-forward, not rebase or squash).
Yeah I guess I rebased without realizing it. Oh well!
| Unfortunately, async methods in traits are not currently supported, |
| and there [are a number of challenges to be |
| resolved](https://rust-lang.github.io/wg-async-foundations/design\_notes/async\_fn\_in\_traits.html) |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broken link
| with them. |
|---|
| Unfortunately, the use of poll does mean that it is harder to write |
| stream implementations. The long-term fix for this, discussed in the [Future possiblilities](future-possibilities) section, is dedicated [generator syntax]. |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broken link
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance you can open a PR with fixes?
LegNeato pushed a commit to LegNeato/juniper that referenced this pull request
Recently tokio got a first stable release and many libraries & applications already migrated to the newest version.
This changes upgrades tokio version to 1.0.2:
Tokio renamed some of its features, e.g
rt-utilandrt-corenow combined intort.streamfeature got extracted to a separate crate tokio-stream, waiting for eventualStreamlanding to the Rust std library. RFC
[TODO]
Actix's integration test_actix_ws_integration test still fails due to, I guess, async rt is not being initialized. Apart from that, the tests are green. Please feel free to take over this effort.
Reviewers
kennytm kennytm left review comments
nikomatsakis nikomatsakis approved these changes
Darksonn Darksonn left review comments
tmandry tmandry approved these changes
KodrAus KodrAus left review comments
WaffleLapkin WaffleLapkin left review comments
+9 more reviewers
skade skade left review comments
Nemo157 Nemo157 left review comments
jonhoo jonhoo left review comments
tanriol tanriol left review comments
pickfire pickfire left review comments
l4l l4l left review comments
taiki-e taiki-e left review comments
Reviewers whose approvals may not affect merge requirements