Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider having polling an error represent the final Stream value #206

Closed
carllerche opened this issue Oct 12, 2016 · 57 comments
Closed

Consider having polling an error represent the final Stream value #206

carllerche opened this issue Oct 12, 2016 · 57 comments

Comments

@carllerche
Copy link
Member

Consider having polling an error represent the final value. In other words, a poll that returns an error means that poll should never be called again. In the case of a Stream, this means that an error indicates the stream has terminated.

This issue is a placeholder for the associated discussion.

cc @aturon

@aturon
Copy link
Member

aturon commented Oct 12, 2016

@alexcrichton and I debated this initially, and the two options are equally expressive, in that a given stream can be set up to behave either way in either approach (e.g. by using Result in the Item type). So the question is what is the most convenient/straightforward default.

I tend to think that it's much more common for stream errors to be "fatal" than recoverable, so I agree with @carllerche that we've probably chosen the wrong default here.

@alexcrichton
Copy link
Member

I also think that this is largely a question of what the combinators do. For example the TCP listener probably won't fuse itself once an error happens but rather will continue to keep accepting sockets. The combinators, however, would be able to assume that when an error happens or Ok(None) happens that everything is terminated.

Sounds reasonable to switch to errors == "end of stream"

@alexcrichton alexcrichton added this to the 0.2 release milestone Oct 12, 2016
@ghost
Copy link

ghost commented Oct 12, 2016

Terminating a stream on an error would simplify the life of implementers of the
Stream trait, as they wouldn't have to ensure any particular behaviour of
further polls after returning an error (presuming that behaviour of poll after
termination is still left unspecified).

The main question seems to be indeed, what combinators would do in that case.
They could be overspecified with respect to errors (edit: at least those that
short-circuit on error), and continue to behave exactly the same as they
currently do. In this case the difference would not be particularly important.

@ghost
Copy link

ghost commented Oct 12, 2016

I the world with streams terminating on an error, would there still be a place
for combinators that recover from errors like or_else and then?

They could recover from one error, but what next? Their only option seems to be
to terminate the stream, as otherwise they would be breaking the Stream
contract. Notice that it is the combinator that would be breaking the contract
on continued polling, not a downstream consumer.

This seems to be a weak point of stopping on errors. Error recovery is
possible, but only once. Alternatively, those combinators could have stronger
preconditions to work only with streams that permit further polling on errors.

@alexcrichton
Copy link
Member

Hm yeah that's a very good point about the combinators. I think this would remove basically all of the "chaining" combinators like and_then, or_else, and then. More specifically:

  • or_else would only allow for recovery of one error, which seems odd
  • then similarly would only allow catching one error, which seems odd
  • and_then would maybe generate errors from futures, not the stream. This error would enter the stream and then oddly terminate it even though the original stream is just fine.

That... may slightly change my opinion here. @carllerche @aturon thoughts about the effect on these chaining combinators with streams?

@carllerche
Copy link
Member Author

IMO this is an argument in favor of having the Stream error terminate the stream.

There are two different error classes with streams:

  1. A stream of values that are either OK or errors.
  2. An error in the production of values, which means that no further values may be produced due to that error.

The current behavior of stream errors implies the first category which makes the second class of errors difficult to model.

If, however, the stream error terminates the stream, this implies that a stream error follows under group #2.

TcpListener::incoming is group #1, in which case, I believe this is modeled much better with a Stream<Item = Result<T, E>, E2>

This lets you differentiate between producing an error value, and an error in value production (you can have potentially two different error types).

As pointed out, combinators like or_else, and_then, then, etc.. don't make sense anymore, but a different set of combinators do:

incoming
  .map(my_or_else_fn)
  .take_while(Result::is_ok)

For example would provide similar behavior as the current or_else

@LalitMaganti
Copy link

I would also favour the approach of termination of stream on errors. I find that an error which terminates the stream is much more common than one which is an "expected value" and is recoverable from.

(FYI Rx also does this and it has error handling "operators" if you want to check how and_then et. all can be modelled in the new system).

@alexcrichton
Copy link
Member

Hm I wonder if we'd perhaps have:

fn and_then<F, R>(self, f: F) -> impl Stream<Result<R::Item, R::Error>, Self::Error>
    where F: FnMut(Self::Item) -> R,
          R: Future,

That is, @carllerche I agree that if we had these semantics then a TCP listener would be that form of a stream. I find and_then to be a very useful combinator, however, and it'd be a shame to lose it. If, however, and_then just transformed the Item type then perhaps that could work.

I'm still a little uneasy about the composition story though, it doesn't feel quite right...

@carllerche
Copy link
Member Author

You could do something like this, which would have the additional benefit of being able to have different have different error types for the different category of errors:

fn and_then<F, T, E, U, R>(self, f: F) -> impl Stream<Result<U, E>, Self::Error>
    where Self: Stream<Item = Result<T, E>> + Sized,
          F: FnMut(T) -> R,
          R: IntoFuture<Item = Result<U, E>, Error = Self::Error>
{
    // ...
}

... it is a lot of generics though...

@carllerche
Copy link
Member Author

I think this PR is relevant to the discussion: https://github.com/alexcrichton/futures-rs/pull/199/files

Aka, if an error is passed through without terminating the stream it is unclear if it should count as an element or not.

@carllerche carllerche changed the title Consider having polling an error represent the final value Consider having polling an error represent the final Stream value Nov 3, 2016
@ghost ghost mentioned this issue Nov 5, 2016
@alexcrichton
Copy link
Member

If we go with this (which it seems like we will) we should update stream::iter to take an iterator of items, not an iterator of results.

@Kixunil
Copy link
Contributor

Kixunil commented Jan 12, 2017

Shouldn't this be encoded by type? Currently Stream::poll takes &mut self, which indicates self is always in correct state after execution of poll. The other case would be encoded by move.

Maybe we could have two traits (names invented just for now):

enum StreamAsync<S: FallibleStream> {
    Ready(S::Item, S),
    NotReady(S),
}

trait FallibleStream {
    type Item;
    type Error;

    fn poll(self) -> Result<StreamAsync<Self>, Self::Error>;

    // combinators
}

trait InfallibleStream {
     type Item;
     // We don't need error here.

    fn poll(&mut self) -> Async<Self::Item>;
}

// This bridges the two:
impl<S: FallibleStream> InfallibleStream for Result<S, S::Error> {
    type Item = Result<S::Item, S::Error>;

    fn poll(&mut self) -> Async<Self::Item> {
        // Too lazy to write actual code, but the idea is to replace Self::Ok with Err if stream fails.
    }
}

I admit it'd somewhat complicate the API but it might express constrains more clearly and it'd be checked by compiler.

What do you think?

@alexcrichton
Copy link
Member

@Kixunil yes that was considered long ago in the initial early design stages of this library, but unfortunately it suffers a few problems.

  1. Usage of such an interface isn't always the most ergonomic. With so many types always moving there's lots of variable binding and inability to store values in structures.
  2. The Stream trait is no longer object safe, precluding type erasure and creating trait objects.
  3. Performance can decrease sometimes as moves aren't free and if everything is always moving it can cause unnecessary overhead.

@Kixunil
Copy link
Contributor

Kixunil commented Jan 12, 2017

Good points. What exactly do you mean by "With so many types always moving there's lots of variable binding and inability to store values in structures."? I somehow can't imagine, what do you mean.

Also, isn't 3 optimized by compiler? I thought that internally it avoids copying where possible.

Edit: Looking at the ASM of a simple example it seems like it doesn't...

@alexcrichton
Copy link
Member

It basically means that every function consumes the receiver and then optionally returns it. That means that if you try to store partial state in a struct, for example, you'll have to deconstruct and reconstruct the struct frequently or have a lot of Option fields.

@Kixunil
Copy link
Contributor

Kixunil commented Jan 13, 2017

Thank you!

With all this problems, I prefer to keep the semantics as is. Maybe provide some other interface to specify whether error is final. (e.g. is_err_final(&self) -> bool method.)

@carllerche
Copy link
Member Author

@Kixunil could you explain why you would like the semantics to stay as is? If the semantics are changed, you can get the original semantics by having a stream over Result.

@Kixunil
Copy link
Contributor

Kixunil commented Jan 16, 2017

@carllerche I think mutable reference expresses "the value will stay valid". If error indicated end of stream, what should happen when poll is called again? Panic? Return same error?

Keeping it this way has another advantage of not breaking existing code. :)

Also, one interesting example would be writing to file. The write can fail because of full disk but later it can succeed. On the other hand, I can imagine fatal error.

I'd prefer if the semantics could be defined by implementor. Just single fn is_err_final() may be fine. Or we may require Stream::Error: IsFatal too. Or create other associated type ErrorSemantics with one of Fatal, Restartable, Mixed (Mixed would require Stream::Error: IsFatal) I'm not sure what's best approach here.

I'd love if the constraint could be encoded in type system somehow. To give an example, I dislike the way io::{Read, Write} hard-code error type, so there's no way to statically enforce that writing to Vec always succeeds. I'd love if we could avoid creating similar issue in case of Futures.

@robey
Copy link

robey commented Mar 16, 2017

I've played with streams in other languages, and found the idea (in this library) of "errors that don't terminate the stream" odd. Not bad, just odd. I agree with the comments above that you could get the same effect with a stream of Result, so the current behavior of making every item an implicit Result just complicates the common case.

@Kixunil
Copy link
Contributor

Kixunil commented Mar 16, 2017

@robey we have these cases:

  • Stream can never fail
  • Stream can fail (no future operations are possible)
  • Stream can temporarily fail producing a value but it might be able to produce later. The decision whether to terminate it is up to the caller

Same with Sink.

How would you model all of these?

@robey
Copy link

robey commented Mar 16, 2017

@Kixunil I think @alexcrichton's idea from here would let all three cases work: #206 (comment)

The first two cases would be the new behavior, and the third case would be handled by making the stream be explicitly of type Stream<Result<A, E>> -- a stream of things that can each be an item or an error.

@Kixunil
Copy link
Contributor

Kixunil commented Mar 16, 2017

@robey That sounds reasonable. The problem I see there is someone could accidentally mistake one for the other but that is probably unavoidable anyway.

Anyway, I think my comment still holds: we should specify exactly, what should happen when someone calls poll() even if the stream itself failed?

@living180
Copy link

After implementing my own transport for use with tokio-proto, I agree that having an error terminate the stream makes the most sense, and that as @robey said, a non-fatal error can be handled with Stream<Result<A, E>>.

Regarding @Kixunil's question about poll() on a failed stream, the most robust way to handle would be to have poll() take self instead of &mut self as proposed in @Kixunil 's comment. I know that @alexcrichton said that this was considered and ruled out, but that approach would make implementing a stream that can fail much nicer. As it stands right now, my implementation has to save any fatal error so that it can continue to return Ok(None) after an error occurs. The same would be true if the semantics were that calling poll() on a failed stream panics - it would still be necessary for me to maintain state to know that the panic should happen. My implementation would be much simpler if poll() could just consume the stream on completion/failure.

I understand the concern about the performance cost of the moves, but a very basic attempt seems to indicate that the necessary code changes wouldn't be that onerous. As an experiment I created a gist containing an implementation of ForEach::poll() when Stream::poll() takes self instead of &mut self and the necessary changes weren't terribly drastic. Perhaps other cases are less simple though.

@Kixunil
Copy link
Contributor

Kixunil commented Apr 12, 2017

@living180 Thank you for considering my suggestion!

I'd like to revisit my suggestion again since I got into this very recently. I'm implementing an application where correctness > speed (but still it needs to be fast enough).

This is related to Future trait though. Currently, Future takes &mut self and I'd like to have one future produce other (different!) future. However they need to pass non-clone "bus". So the only way to implement it is something like this:

fn poll(&mut self) -> Poll<...> {
    match self.bus.take() {
        Some(bus) => match bus.poll() {
            Ok(Async::Ready(result)) => Ok(Async::Ready(MyNewFuture::new(result, bus)),
            Ok(Async::NotReady) => { 
                 self.bus = Some(bus);
                 Ok(Async::NotReady)
            }
            Err(e) => Err(e)
        },
        None => panic!("poll() called twice")
}

This is weird, involves copying from/to Option<T> anyway (it probably can't be optimised away because of panic safety; similar problem as with std::mem::replace_with() - see it's RFC) and also risks panics that could've been statically type checked. It would be much simpler to do with self instead of &mut self.

@alexcrichton
Copy link
Member

@living180 note that using &mut self instead of self isn't purely for performance, the consumer ergonomics (e.g. being able to store in a struct field) and object safety (being able to create a trait object) are also quite important!

I do agree though that implementations would be easier with self, there's a number of combinators which have "empty" statements which just panic intended for temporary state.

@3n-mb
Copy link

3n-mb commented Oct 17, 2017

Above point is expressed around 13:20 in a C++ talk about observables. Judgment about error is done downstream, at the most opportune and clean place. Again, clean code theme is here.

@carllerche
Copy link
Member Author

carllerche commented Oct 19, 2017

For the record, I have changed my mind since I opened this issue and I believe that the current behavior is the best.

@3n-mb
Copy link

3n-mb commented Oct 19, 2017

@carllerche why? However short you comment is, I cannot read your mind, and I'd love you to share an experience, that made you change your mind.

@3n-mb
Copy link

3n-mb commented Oct 20, 2017

There can be a middle ground here!

  1. Rx style close-all on error, can be handled by Stream::Error.

  2. The less dangerous errors, on which stream merely "skips the beat" but doesn't close, can be accommodated via Stream::Item being a Result<T> instead of bare T.

@Aarowaim
Copy link

A stream returning a Result<Item> should not terminate.

However when the resource underlying the stream returns an Err, the stream should be terminated. If the stream terminated, combinators on it can only return Err unless some action re-establishes the stream's live state (such as jumping the data pointer back to 0 for a stream made from a file resource).

I believe I lean towards handling Err outside of the stream, as it is the only clean way to escalate a problem to something that can fix it. A stream gets closed when its lifetime ends; it is not necessary to close when it becomes invalid and returns Err.

@Aarowaim
Copy link

So to clarify, streams should be terminated (by the user), when the stream itself produces Err. The user can choose to allow object lifetimes to close the stream. They can also, if they know the cause, fix the problem and resume normal use of the stream.

@3n-mb
Copy link

3n-mb commented Oct 21, 2017

streams should be terminated (by the user), when the stream itself produces Err.

Consider the following life example from use of rx.
I use library, or module, written by someone else, of cause. This library let's me to introduce a section of my streams.

Now Err comes from that section. How do I know, if 3rd party code will be ok with me skipping this error, treating it as a "skipped beat", instead of treating it like "big error", for which I should drop the stream and restart its processing from scratch. How am I to know this?

😄 the answer is not "read 3rd party code documentation".

@3n-mb
Copy link

3n-mb commented Oct 21, 2017

I thought about the form. How about the following formulation.
Let's have completion with error and a natural one. And let's have an event that is either ok, or is a lost-beat error.

  • If a combinator has a bad error, it does a completion with error.
  • If a combinator has an error condition, but it may continue working with new events, then it produces a lost-beat error, continuing its operation as normal.
  • When following combinator gets a lost-beat error, it knows itself if such thing is tolerated by its own algorithm. Tolerated lost-beat error is passed down stream. Else, completion with error is done.
  • When lost-beat error comes to end, developer can do her own logical decision about it, as usual.

@Kixunil
Copy link
Contributor

Kixunil commented Nov 1, 2017

@3n-mb Very good question! I'm big believer in the type system and I believe we should have different types/traits for different behaviors. That way the compiler can check our code.

@dvtomas
Copy link

dvtomas commented Dec 14, 2017

Hi,
just my $0.02. I'm new to Rust, but I've used to work with RxScala and a little bit with another Scala library, Monix (better designed IMO) in the past. Both strive to be compatible with the Reactive Streams spec. I'm not really sure how similar futures streams are or want to be to the Reactive Streams spec, but the spec considers the reactive stream done with after encountering an error, no more items are emitted after an error, so I'm used to this behavior.

I'm not sure if this is relevant to the discussion as I'm VERY new to Rust and futures streams. Still, I wanted to mention Reactive Streams in case somebody finds it useful.

@3n-mb
Copy link

3n-mb commented Dec 21, 2017

Sounds like Reactive Streams deal with non-blocking backpressure.
This overflow notes that streams are pull-based, while observables (rx, or Reactive Extensions) are push-based.

How much tokio streams are pull-based, i.e. include backpressure control, versus observables (no inbuilt backpressure control)?
Or, were do tokio streams fit between reactive streams and observables?

@Kixunil
Copy link
Contributor

Kixunil commented Jan 1, 2018

@3n-mb I think this is off-topic. Streams are pull-based and sinks are push-based (but they still have backpresure). There's also unbounded channel, which has push-based non-blocking end.

@luben
Copy link

luben commented Jan 15, 2018

@3n-mb the linked overview is about java-streams, not reactive streams. BTW the reactive streams are now in Java 9 as java.util.concurrent.Flow.

@3n-mb
Copy link

3n-mb commented Jan 15, 2018

@luben I assume you talk about this link, and it compares the two:

There are significant differences between Observable and Stream:
...

@luben Yes! It is even in Java!
Both reactive streams and observables are recognized by many language communities as being useful for folks like me, i.e. regular developers, users of core libraries.
Rust? 😢

@aturon
Copy link
Member

aturon commented Feb 12, 2018

@cramertj was there any particular decision here?

@aturon
Copy link
Member

aturon commented Mar 19, 2018

This has been resolved.

@aturon aturon closed this as completed Mar 19, 2018
@jonas-schievink
Copy link

(the resolution was to not change the behaviour - a stream returning an error is not terminated and can yield further items or errors)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests