From e5091cc065b492cfaba9896cb488035e291555e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C5=A0abata?= Date: Thu, 2 Aug 2012 17:10:04 +0200 Subject: [PATCH] Add IO::SessionDat and IO::SessionSet from SOAP::Lite 0.714 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Petr Ĺ abata --- lib/IO/SessionData.pm | 230 +++++++++++++++++++++++++++++++++++++++++++++++++ lib/IO/SessionSet.pm | 163 ++++++++++++++++++++++++++++++++++ 2 files changed, 393 insertions(+), 0 deletions(-) create mode 100644 lib/IO/SessionData.pm create mode 100644 lib/IO/SessionSet.pm diff --git a/lib/IO/SessionData.pm b/lib/IO/SessionData.pm new file mode 100644 index 0000000..de85382 --- /dev/null +++ b/lib/IO/SessionData.pm @@ -0,0 +1,230 @@ +# ====================================================================== +# +# Copyright (C) 2000 Lincoln D. Stein +# Slightly modified by Paul Kulchenko to work on multiple platforms +# Formatting changed to match the layout layed out in Perl Best Practices +# (by Damian Conway) by Martin Kutter in 2008 +# +# ====================================================================== + +package IO::SessionData; + +use strict; +use Carp; +use IO::SessionSet; +use vars '$VERSION'; +$VERSION = 1.02; + +use constant BUFSIZE => 3000; + +BEGIN { + my @names = qw(EWOULDBLOCK EAGAIN EINPROGRESS); + my %WOULDBLOCK = + (eval {require Errno} + ? map { + Errno->can($_) + ? (Errno->can($_)->() => 1) + : (), + } @names + : () + ), + (eval {require POSIX} + ? map { + POSIX->can($_) && eval { POSIX->can($_)->() } + ? (POSIX->can($_)->() => 1) + : () + } @names + : () + ); + + sub WOULDBLOCK { $WOULDBLOCK{$_[0]+0} } +} + +# Class method: new() +# Create a new IO::SessionData object. Intended to be called from within +# IO::SessionSet, not directly. +sub new { + my $pack = shift; + my ($sset,$handle,$writeonly) = @_; + # make the handle nonblocking (but check for 'blocking' method first) + # thanks to Jos Clijmans + $handle->blocking(0) if $handle->can('blocking'); + my $self = bless { + outbuffer => '', + sset => $sset, + handle => $handle, + write_limit => BUFSIZE, + writeonly => $writeonly, + choker => undef, + choked => 0, + },$pack; + $self->readable(1) unless $writeonly; + return $self; +} + +# Object method: handle() +# Return the IO::Handle object corresponding to this IO::SessionData +sub handle { + return shift->{handle}; +} + +# Object method: sessions() +# Return the IO::SessionSet controlling this object. +sub sessions { + return shift->{sset}; +} + +# Object method: pending() +# returns number of bytes pending in the out buffer +sub pending { + return length shift->{outbuffer}; +} + +# Object method: write_limit([$bufsize]) +# Get or set the limit on the size of the write buffer. +# Write buffer will grow to this size plus whatever extra you write to it. +sub write_limit { + my $self = shift; + return defined $_[0] + ? $self->{write_limit} = $_[0] + : $self->{write_limit}; +} + +# set a callback to be called when the contents of the write buffer becomes larger +# than the set limit. +sub set_choke { + my $self = shift; + return defined $_[0] + ? $self->{choker} = $_[0] + : $self->{choker}; +} + +# Object method: write($scalar) +# $obj->write([$data]) -- append data to buffer and try to write to handle +# Returns number of bytes written, or 0E0 (zero but true) if data queued but not +# written. On other errors, returns undef. +sub write { + my $self = shift; + return unless my $handle = $self->handle; # no handle + return unless defined $self->{outbuffer}; # no buffer for queued data + + $self->{outbuffer} .= $_[0] if defined $_[0]; + + my $rc; + if ($self->pending) { # data in the out buffer to write + local $SIG{PIPE}='IGNORE'; + # added length() to make it work on Mac. Thanks to Robin Fuller + $rc = syswrite($handle,$self->{outbuffer},length($self->{outbuffer})); + + # able to write, so truncate out buffer apropriately + if ($rc) { + substr($self->{outbuffer},0,$rc) = ''; + } + elsif (WOULDBLOCK($!)) { # this is OK + $rc = '0E0'; + } + else { # some sort of write error, such as a PIPE error + return $self->bail_out($!); + } + } + else { + $rc = '0E0'; # nothing to do, but no error either + } + + $self->adjust_state; + + # Result code is the number of bytes successfully transmitted + return $rc; +} + +# Object method: read($scalar,$length [,$offset]) +# Just like sysread(), but returns the number of bytes read on success, +# 0EO ("0 but true") if the read would block, and undef on EOF and other failures. +sub read { + my $self = shift; + return unless my $handle = $self->handle; + my $rc = sysread($handle,$_[0],$_[1],$_[2]||0); + return $rc if defined $rc; + return '0E0' if WOULDBLOCK($!); + return; +} + +# Object method: close() +# Close the session and remove it from the monitored list. +sub close { + my $self = shift; + unless ($self->pending) { + $self->sessions->delete($self); + CORE::close($self->handle); + } + else { + $self->readable(0); + $self->{closing}++; # delayed close + } +} + +# Object method: adjust_state() +# Called periodically from within write() to control the +# status of the handle on the IO::SessionSet's IO::Select sets +sub adjust_state { + my $self = shift; + + # make writable if there's anything in the out buffer + $self->writable($self->pending > 0); + + # make readable if there's no write limit, or the amount in the out + # buffer is less than the write limit. + $self->choke($self->write_limit <= $self->pending) if $self->write_limit; + + # Try to close down the session if it is flagged + # as in the closing state. + $self->close if $self->{closing}; +} + +# choke gets called when the contents of the write buffer are larger +# than the limit. The default action is to inactivate the session for further +# reading until the situation is cleared. +sub choke { + my $self = shift; + my $do_choke = shift; + return if $self->{choked} == $do_choke; # no change in state + if (ref $self->set_choke eq 'CODE') { + $self->set_choke->($self,$do_choke); + } + else { + $self->readable(!$do_choke); + } + $self->{choked} = $do_choke; +} + +# Object method: readable($flag) +# Flag the associated IO::SessionSet that we want to do reading on the handle. +sub readable { + my $self = shift; + my $is_active = shift; + return if $self->{writeonly}; + $self->sessions->activate($self,'read',$is_active); +} + +# Object method: writable($flag) +# Flag the associated IO::SessionSet that we want to do writing on the handle. +sub writable { + my $self = shift; + my $is_active = shift; + $self->sessions->activate($self,'write',$is_active); +} + +# Object method: bail_out([$errcode]) +# Called when an error is encountered during writing (such as a PIPE). +# Default behavior is to flush all buffered outgoing data and to close +# the handle. +sub bail_out { + my $self = shift; + my $errcode = shift; # save errorno + delete $self->{outbuffer}; # drop buffered data + $self->close; + $! = $errcode; # restore errno + return; +} + +1; diff --git a/lib/IO/SessionSet.pm b/lib/IO/SessionSet.pm new file mode 100644 index 0000000..ae6e4fe --- /dev/null +++ b/lib/IO/SessionSet.pm @@ -0,0 +1,163 @@ +# ====================================================================== +# +# Copyright (C) 2000 Lincoln D. Stein +# Formatting changed to match the layout layed out in Perl Best Practices +# (by Damian Conway) by Martin Kutter in 2008 +# +# ====================================================================== + +package IO::SessionSet; + +use strict; +use Carp; +use IO::Select; +use IO::Handle; +use IO::SessionData; + +use vars '$DEBUG'; +$DEBUG = 0; + +# Class method new() +# Create a new Session set. +# If passed a listening socket, use that to +# accept new IO::SessionData objects automatically. +sub new { + my $pack = shift; + my $listen = shift; + my $self = bless { + sessions => {}, + readers => IO::Select->new(), + writers => IO::Select->new(), + }, $pack; + # if initialized with an IO::Handle object (or subclass) + # then we treat it as a listening socket. + if ( defined($listen) and $listen->can('accept') ) { + $self->{listen_socket} = $listen; + $self->{readers}->add($listen); + } + return $self; +} + +# Object method: sessions() +# Return list of all the sessions currently in the set. +sub sessions { + return values %{shift->{sessions}} +}; + +# Object method: add() +# Add a handle to the session set. Will automatically +# create a IO::SessionData wrapper around the handle. +sub add { + my $self = shift; + my ($handle,$writeonly) = @_; + warn "Adding a new session for $handle.\n" if $DEBUG; + return $self->{sessions}{$handle} = + $self->SessionDataClass->new($self,$handle,$writeonly); +} + +# Object method: delete() +# Remove a session from the session set. May pass either a handle or +# a corresponding IO::SessionData wrapper. +sub delete { + my $self = shift; + my $thing = shift; + my $handle = $self->to_handle($thing); + my $sess = $self->to_session($thing); + warn "Deleting session $sess handle $handle.\n" if $DEBUG; + delete $self->{sessions}{$handle}; + $self->{readers}->remove($handle); + $self->{writers}->remove($handle); +} + +# Object method: to_handle() +# Return a handle, given either a handle or a IO::SessionData object. +sub to_handle { + my $self = shift; + my $thing = shift; + return $thing->handle if $thing->isa('IO::SessionData'); + return $thing if defined (fileno $thing); + return; # undefined value +} + +# Object method: to_session +# Return a IO::SessionData object, given either a handle or the object itself. +sub to_session { + my $self = shift; + my $thing = shift; + return $thing if $thing->isa('IO::SessionData'); + return $self->{sessions}{$thing} if defined (fileno $thing); + return; # undefined value +} + +# Object method: activate() +# Called with parameters ($session,'read'|'write' [,$activate]) +# If called without the $activate argument, will return true +# if the indicated handle is on the read or write IO::Select set. +# May use either a session object or a handle as first argument. +sub activate { + my $self = shift; + my ($thing,$rw,$act) = @_; + croak 'Usage $obj->activate($session,"read"|"write" [,$activate])' + unless @_ >= 2; + my $handle = $self->to_handle($thing); + my $select = lc($rw) eq 'read' ? 'readers' : 'writers'; + my $prior = defined $self->{$select}->exists($handle); + if (defined $act && $act != $prior) { + $self->{$select}->add($handle) if $act; + $self->{$select}->remove($handle) unless $act; + warn $act ? 'Activating' : 'Inactivating', + " handle $handle for ", + $rw eq 'read' ? 'reading':'writing',".\n" if $DEBUG; + } + return $prior; +} + +# Object method: wait() +# Wait for I/O. Handles writes automatically. Returns a list of +# IO::SessionData objects ready for reading. +# If there is a listen socket, then will automatically do an accept() +# and return a new IO::SessionData object for that. +sub wait { + my $self = shift; + my $timeout = shift; + + # Call select() to get the list of sessions that are ready for + # reading/writing. + warn "IO::Select->select() returned error: $!" + unless my ($read,$write) = + IO::Select->select($self->{readers},$self->{writers},undef,$timeout); + + # handle queued writes automatically + foreach (@$write) { + my $session = $self->to_session($_); + warn "Writing pending data (",$session->pending+0," bytes) for $_.\n" + if $DEBUG; + my $rc = $session->write; + } + + # Return list of sessions that are ready for reading. + # If one of the ready handles is the listen socket, then + # create a new session. + # Otherwise return the ready handles as a list of IO::SessionData objects. + my @sessions; + foreach (@$read) { + if ($_ eq $self->{listen_socket}) { + my $newhandle = $_->accept; + warn "Accepting a new handle $newhandle.\n" if $DEBUG; + my $newsess = $self->add($newhandle) if $newhandle; + push @sessions,$newsess; + } + else { + push @sessions,$self->to_session($_); + } + } + return @sessions; +} + +# Class method: SessionDataClass +# Return the string containing the name of the session data +# wrapper class. Subclass and override to use a different +# session data class. +sub SessionDataClass { return 'IO::SessionData'; } + +1; -- 1.7.7.6