} elsif ($self->{on_read}) {
last unless $len;
$self->{on_read}($self);
if (
$len == length $self->{rbuf} # if no data has been consumed
&& !@{ $self->{_queue} } # and the queue is still empty
&& $self->{on_read} # but we still have on_read
) {
# no further data will arrive
# so no progress can be made
$self->_error (&Errno::EPIPE, 1), last
if $self->{_eof};
last; # more data might arrive
}
} else {
# read side becomes idle
delete $self->{_rw};
last;
}
}
$self->{on_eof}($self)
if $self->{_eof} && $self->{on_eof};
# may need to restart read watcher
unless ($self->{_rw}) {
$self->start_read
if $self->{on_read} || @{ $self->{_queue} };
}
}
=item $handle->on_read ($cb)
This replaces the currently set C<on_read> callback, or clears it (when
the new callback is C<undef>). See the description of C<on_read> in the
constructor.
=cut
sub on_read {
my ($self, $cb) = @_;
$self->{on_read} = $cb;
$self->_drain_rbuf if $cb && !$self->{_in_drain};
}
=item $handle->rbuf
Returns the read buffer (as a modifiable lvalue).
You can access the read buffer directly as the C<< ->{rbuf} >> member, if
you want.
NOTE: The read buffer should only be used or modified if the C<on_read>,
C<push_read> or C<unshift_read> methods are used. The other read methods
automatically manage the read buffer.
=cut
sub rbuf : lvalue {
$_[0]{rbuf}
}
=item $handle->push_read ($cb)
=item $handle->unshift_read ($cb)
Append the given callback to the end of the queue (C<push_read>) or
prepend it (C<unshift_read>).
The callback is called each time some additional read data arrives.
It must check whether enough data is in the read buffer already.
If not enough data is available, it must return the empty list or a false
value, in which case it will be called repeatedly until enough data is
available (or an error condition is detected).
If enough data was available, then the callback must remove all data it is
interested in (which can be none at all) and return a true value. After returning
true, it will be removed from the queue.
=cut
our %RH;
sub register_read_type($$) {
$RH{$_[0]} = $_[1];
}
sub push_read {
my $self = shift;
my $cb = pop;
if (@_) {
my $type = shift;
=8= |