diff --git a/lib/Danga/Client.pm b/lib/Danga/Client.pm new file mode 100644 index 0000000..7b13477 --- /dev/null +++ b/lib/Danga/Client.pm @@ -0,0 +1,133 @@ +# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $ + +package Danga::Client; +use base 'Danga::TimeoutSocket'; +use fields qw(line closing disable_read can_read_mode); +use Time::HiRes (); + +# 30 seconds max timeout! +sub max_idle_time { 30 } + +sub new { + my Danga::Client $self = shift; + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + + $self->reset_for_next_message; + return $self; +} + +sub reset_for_next_message { + my Danga::Client $self = shift; + $self->{line} = ''; + $self->{disable_read} = 0; + $self->{can_read_mode} = 0; + return $self; +} + +sub get_line { + my Danga::Client $self = shift; + if (!$self->have_line) { + $self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 }); + #warn("get_line PRE\n"); + $self->EventLoop(); + #warn("get_line POST\n"); + $self->watch_read(0); + } + return if $self->{closing}; + # now have a line. + $self->{alive_time} = time; + $self->{line} =~ s/^(.*?\n)//; + return $1; +} + +sub can_read { + my Danga::Client $self = shift; + my ($timeout) = @_; + my $end = Time::HiRes::time() + $timeout; + warn("Calling can-read\n"); + $self->{can_read_mode} = 1; + if (!length($self->{line})) { + my $old = $self->watch_read(); + $self->watch_read(1); + $self->SetPostLoopCallback(sub { (length($self->{line}) || + (Time::HiRes::time > $end)) ? 0 : 1 }); + #warn("get_line PRE\n"); + $self->EventLoop(); + #warn("get_line POST\n"); + $self->watch_read($old); + } + $self->{can_read_mode} = 0; + $self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 }); + return if $self->{closing}; + $self->{alive_time} = time; + warn("can_read returning for '$self->{line}'\n"); + return 1 if length($self->{line}); + return; +} + +sub have_line { + my Danga::Client $self = shift; + return 1 if $self->{closing}; + if ($self->{line} =~ /\n/) { + return 1; + } + return 0; +} + +sub event_read { + my Danga::Client $self = shift; + my $bref = $self->read(8192); + return $self->close($!) unless defined $bref; + # $self->watch_read(0); + $self->process_read_buf($bref); +} + +sub process_read_buf { + my Danga::Client $self = shift; + my $bref = shift; + $self->{line} .= $$bref; + return if $self->{can_read_mode}; + return if $::LineMode; + + while ($self->{line} =~ s/^(.*?\n)//) { + my $line = $1; + $self->{alive_time} = time; + my $resp = $self->process_line($line); + if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) } + $self->write($resp) if $resp; + $self->watch_read(0) if $self->{disable_read}; + } +} + +sub disable_read { + my Danga::Client $self = shift; + $self->{disable_read}++; + $self->watch_read(0); +} + +sub enable_read { + my Danga::Client $self = shift; + $self->{disable_read}--; + if ($self->{disable_read} <= 0) { + $self->{disable_read} = 0; + $self->watch_read(1); + } +} + +sub process_line { + my Danga::Client $self = shift; + return ''; +} + +sub close { + my Danga::Client $self = shift; + $self->{closing} = 1; + print "closing @_\n" if $::DEBUG; + $self->SUPER::close(@_); +} + +sub event_err { my Danga::Client $self = shift; $self->close("Error") } +sub event_hup { my Danga::Client $self = shift; $self->close("Disconnect (HUP)") } + +1; diff --git a/lib/Danga/DNS.pm b/lib/Danga/DNS.pm new file mode 100644 index 0000000..e57a3a4 --- /dev/null +++ b/lib/Danga/DNS.pm @@ -0,0 +1,170 @@ +# $Id: DNS.pm,v 1.12 2005/02/14 22:06:08 msergeant Exp $ + +package Danga::DNS; + +# This is the query class - it is really just an encapsulation of the +# hosts you want to query, plus the callback. All the hard work is done +# in Danga::DNS::Resolver. + +use fields qw(client hosts num_hosts callback results start); +use strict; + +use Danga::DNS::Resolver; + +my $resolver; + +sub trace { + my $level = shift; + print ("[$$] dns lookup: @_") if $::DEBUG >= $level; +} + +sub new { + my Danga::DNS $self = shift; + my %options = @_; + + $resolver ||= Danga::DNS::Resolver->new(); + + my $client = $options{client}; + $client->disable_read if $client; + + $self = fields::new($self) unless ref $self; + + $self->{hosts} = $options{hosts} ? $options{hosts} : [ $options{host} ]; + $self->{num_hosts} = scalar(@{$self->{hosts}}) || "No hosts supplied"; + $self->{client} = $client; + $self->{callback} = $options{callback} || die "No callback given"; + $self->{results} = {}; + $self->{start} = time; + + if ($options{type}) { + if ($options{type} eq 'TXT') { + if (!$resolver->query_txt($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + elsif ($options{type} eq 'A') { + if (!$resolver->query($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + elsif ($options{type} eq 'PTR') { + if (!$resolver->query($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + elsif ($options{type} eq 'MX') { + if (!$resolver->query_mx($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + else { + die "Unsupported DNS query type: $options{type}"; + } + } + else { + if (!$resolver->query($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + + return $self; +} + +sub run_callback { + my Danga::DNS $self = shift; + my ($result, $query) = @_; + $self->{results}{$query} = $result; + trace(2, "got $query => $result\n"); + eval { + $self->{callback}->($result, $query); + }; + if ($@) { + warn($@); + } +} + +sub DESTROY { + my Danga::DNS $self = shift; + my $now = time; + foreach my $host (@{$self->{hosts}}) { + if (!$self->{results}{$host}) { + print "DNS timeout (presumably) looking for $host after " . ($now - $self->{start}) . " secs\n"; + $self->{callback}->("NXDOMAIN", $host); + } + } + $self->{client}->enable_read if $self->{client}; +} + +1; + +=head1 NAME + +Danga::DNS - a DNS lookup class for the Danga::Socket framework + +=head1 SYNOPSIS + + Danga::DNS->new(%options); + +=head1 DESCRIPTION + +This module performs asynchronous DNS lookups, making use of a single UDP +socket (unlike Net::DNS's bgsend/bgread combination), and blocking reading on +a client until the response comes back (this is useful for e.g. SMTP rDNS +lookups where you want the answer before you see the next SMTP command). + +Currently this module will only perform A or PTR lookups. A rDNS (PTR) lookup +will be performed if the host matches the regexp: C. + +The lookups time out after 15 seconds. + +=head1 API + +=head2 C<< Danga::DNS->new( %options ) >> + +Create a new DNS query. You do not need to store the resulting object as this +class is all done with callbacks. + +Example: + + Danga::DNS->new( + callback => sub { print "Got result: $_[0]\n" }, + host => 'google.com', + ); + +=over 4 + +=item B<[required]> C + +The callback to call when results come in. This should be a reference to a +subroutine. The callback receives two parameters - the result of the DNS lookup +and the host that was looked up. + +=item C + +A host name to lookup. Note that if the hostname is a dotted quad of numbers then +a reverse DNS (PTR) lookup is performend. + +=item C + +An array-ref list of hosts to lookup. + +B One of either C or C is B. + +=item C + +It is possible to specify a C object (or subclass) which you wish +to disable for reading until your DNS result returns. + +=item C + +You can specify one of: I<"A">, I<"PTR"> or I<"TXT"> here. Other types may be +supported in the future. + +=back + +=cut diff --git a/lib/Danga/DNS/Resolver.pm b/lib/Danga/DNS/Resolver.pm new file mode 100644 index 0000000..ded6e37 --- /dev/null +++ b/lib/Danga/DNS/Resolver.pm @@ -0,0 +1,322 @@ +# $Id: Resolver.pm,v 1.3 2005/02/14 22:06:08 msergeant Exp $ + +package Danga::DNS::Resolver; +use base qw(Danga::Socket); + +use fields qw(res dst id_to_asker id_to_query timeout cache cache_timeout); + +use Net::DNS; +use Socket; +use strict; + +our $last_cleanup = 0; + +sub trace { + my $level = shift; + print ("$::DEBUG/$level [$$] dns lookup: @_") if $::DEBUG >= $level; +} + +sub new { + my Danga::DNS::Resolver $self = shift; + + $self = fields::new($self) unless ref $self; + + my $res = Net::DNS::Resolver->new; + + my $sock = IO::Socket::INET->new( + Proto => 'udp', + LocalAddr => $res->{'srcaddr'}, + LocalPort => ($res->{'srcport'} || undef), + ) || die "Cannot create socket: $!"; + IO::Handle::blocking($sock, 0); + + trace(2, "Using nameserver $res->{nameservers}->[0]:$res->{port}\n"); + my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton($res->{'nameservers'}->[0])); + #my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton('127.0.0.1')); + #my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton('10.2.1.20')); + + $self->{res} = $res; + $self->{dst} = $dst_sockaddr; + $self->{id_to_asker} = {}; + $self->{id_to_query} = {}; + $self->{timeout} = {}; + $self->{cache} = {}; + $self->{cache_timeout} = {}; + + $self->SUPER::new($sock); + + $self->watch_read(1); + + return $self; +} + +sub _query { + my Danga::DNS::Resolver $self = shift; + my ($asker, $host, $type, $now) = @_; + + if ($ENV{NODNS}) { + $asker->run_callback("NXDNS", $host); + return 1; + } + if (exists $self->{cache}{$type}{$host}) { + # print "CACHE HIT!\n"; + $asker->run_callback($self->{cache}{$type}{$host}, $host); + return 1; + } + + my $packet = $self->{res}->make_query_packet($host, $type); + my $packet_data = $packet->data; + + my $h = $packet->header; + my $id = $h->id; + + if (!$self->sock->send($packet_data, 0, $self->{dst})) { + return; + } + + trace(2, "Query: $host ($id)\n"); + + $self->{id_to_asker}->{$id} = $asker; + $self->{id_to_query}->{$id} = $host; + $self->{timeout}->{$id} = $now; + + return 1; +} + +sub query_txt { + my Danga::DNS::Resolver $self = shift; + my ($asker, @hosts) = @_; + + my $now = time(); + + trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve TXT: @hosts\n"); + + foreach my $host (@hosts) { + $self->_query($asker, $host, 'TXT', $now) || return; + } + + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } + + #print "+Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + + return 1; +} + +sub query_mx { + my Danga::DNS::Resolver $self = shift; + my ($asker, @hosts) = @_; + + my $now = time(); + + trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve MX: @hosts\n"); + + foreach my $host (@hosts) { + $self->_query($asker, $host, 'MX', $now) || return; + } + + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } + + #print "+Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + + return 1; +} + +sub query { + my Danga::DNS::Resolver $self = shift; + my ($asker, @hosts) = @_; + + my $now = time(); + + trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve A/PTR: @hosts\n"); + + foreach my $host (@hosts) { + $self->_query($asker, $host, 'A', $now) || return; + } + + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } + + #print "+Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + + return 1; +} + +sub ticker { + my Danga::DNS::Resolver $self = shift; + my $now = time; + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } +} + +sub _do_cleanup { + my Danga::DNS::Resolver $self = shift; + my $now = shift; + + my $idle = $self->max_idle_time; + + my @to_delete; + while (my ($id, $t) = each(%{$self->{timeout}})) { + if ($t < ($now - $idle)) { + push @to_delete, $id; + } + } + + foreach my $id (@to_delete) { + delete $self->{timeout}{$id}; + my $asker = delete $self->{id_to_asker}{$id}; + my $query = delete $self->{id_to_query}{$id}; + $asker->run_callback("NXDOMAIN", $query); + } + + foreach my $type ('A', 'TXT') { + @to_delete = (); + + while (my ($query, $t) = each(%{$self->{cache_timeout}{$type}})) { + if ($t < $now) { + push @to_delete, $query; + } + } + + foreach my $q (@to_delete) { + delete $self->{cache_timeout}{$type}{$q}; + delete $self->{cache}{$type}{$q}; + } + } +} + +# seconds max timeout! +sub max_idle_time { 30 } + +# Danga::DNS +sub event_err { shift->close("dns socket error") } +sub event_hup { shift->close("dns socket error") } + +sub event_read { + my Danga::DNS::Resolver $self = shift; + + while (my $packet = $self->{res}->bgread($self->sock)) { + my $err = $self->{res}->errorstring; + my $answers = 0; + my $header = $packet->header; + my $id = $header->id; + + my $asker = delete $self->{id_to_asker}->{$id}; + my $query = delete $self->{id_to_query}->{$id}; + delete $self->{timeout}{$id}; + + #print "-Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + if (!$asker) { + trace(1, "No asker for id: $id\n"); + return; + } + + my $now = time(); + my @questions = $packet->question; + #print STDERR "response to ", $questions[0]->string, "\n"; + foreach my $rr ($packet->answer) { + # my $q = shift @questions; + if ($rr->type eq "PTR") { + my $rdns = $rr->ptrdname; + if ($query) { + # NB: Cached as an "A" lookup as there's no overlap and they + # go through the same query() function above + $self->{cache}{A}{$query} = $rdns; + $self->{cache_timeout}{A}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long + } + $asker->run_callback($rdns, $query); + } + elsif ($rr->type eq "A") { + my $ip = $rr->address; + if ($query) { + $self->{cache}{A}{$query} = $ip; + $self->{cache_timeout}{A}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long + } + $asker->run_callback($ip, $query); + } + elsif ($rr->type eq "TXT") { + my $txt = $rr->txtdata; + if ($query) { + $self->{cache}{TXT}{$query} = $txt; + $self->{cache_timeout}{TXT}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long + } + $asker->run_callback($txt, $query); + } + else { + # came back, but not a PTR or A record + $asker->run_callback("unknown", $query); + } + $answers++; + } + if (!$answers) { + if ($err eq "NXDOMAIN") { + # trace("found => NXDOMAIN\n"); + $asker->run_callback("NXDOMAIN", $query); + } + elsif ($err eq "SERVFAIL") { + # try again??? + print "SERVFAIL looking for $query (Pending: " . keys(%{$self->{id_to_asker}}) . ")\n"; + #$self->query($asker, $query); + $asker->run_callback($err, $query); + #$self->{id_to_asker}->{$id} = $asker; + #$self->{id_to_query}->{$id} = $query; + #$self->{timeout}{$id} = time(); + + } + elsif($err) { + print("error: $err\n"); + $asker->run_callback($err, $query); + } + else { + # trace("no answers\n"); + $asker->run_callback("NXDOMAIN", $query); + } + } + } +} + +use Carp qw(confess); + +sub close { + my Danga::DNS::Resolver $self = shift; + + $self->SUPER::close(shift); + confess "Danga::DNS::Resolver socket should never be closed!"; +} + +1; + +=head1 NAME + +Danga::DNS::Resolver - an asynchronous DNS resolver class + +=head1 SYNOPSIS + + my $res = Danga::DNS::Resolver->new(); + + $res->query($obj, @hosts); # $obj implements $obj->run_callback() + +=head1 DESCRIPTION + +This is a low level DNS resolver class that works within the Danga::Socket +asynchronous I/O framework. Do not attempt to use this class standalone - use +the C class instead. + +=cut diff --git a/lib/Danga/Socket.pm b/lib/Danga/Socket.pm new file mode 100644 index 0000000..e94220f --- /dev/null +++ b/lib/Danga/Socket.pm @@ -0,0 +1,831 @@ +########################################################################### + +=head1 NAME + +Danga::Socket - Event-driven async IO class + +=head1 SYNOPSIS + + use base ('Danga::Socket'); + +=head1 DESCRIPTION + +This is an abstract base class which provides the basic framework for +event-driven asynchronous IO. + +=cut + +########################################################################### + +package Danga::Socket; +use strict; + +use vars qw{$VERSION}; +$VERSION = do { my @r = (q$Revision: 1.4 $ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r }; + +use fields qw(sock fd write_buf write_buf_offset write_buf_size + read_push_back + closed event_watch debug_level); + +use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN + EPIPE EAGAIN EBADF ECONNRESET); + +use Socket qw(IPPROTO_TCP); +use Carp qw{croak confess}; + +use constant TCP_CORK => 3; # FIXME: not hard-coded (Linux-specific too) + +use constant DebugLevel => 0; + +# for epoll definitions: +our $HAVE_SYSCALL_PH = eval { require 'syscall.ph'; 1 } || eval { require 'sys/syscall.ph'; 1 }; +our $HAVE_KQUEUE = eval { require IO::KQueue; 1 }; + +# Explicitly define the poll constants, as either one set or the other won't be +# loaded. They're also badly implemented in IO::Epoll: +# The IO::Epoll module is buggy in that it doesn't export constants efficiently +# (at least as of 0.01), so doing constants ourselves saves 13% of the user CPU +# time +use constant EPOLLIN => 1; +use constant EPOLLOUT => 4; +use constant EPOLLERR => 8; +use constant EPOLLHUP => 16; +use constant EPOLL_CTL_ADD => 1; +use constant EPOLL_CTL_DEL => 2; +use constant EPOLL_CTL_MOD => 3; + +use constant POLLIN => 1; +use constant POLLOUT => 4; +use constant POLLERR => 8; +use constant POLLHUP => 16; +use constant POLLNVAL => 32; + +# keep track of active clients +our ( + $HaveEpoll, # Flag -- is epoll available? initially undefined. + $HaveKQueue, + %DescriptorMap, # fd (num) -> Danga::Socket object + %PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data) + $Epoll, # Global epoll fd (for epoll mode only) + $KQueue, # Global kqueue fd (for kqueue mode only) + @ToClose, # sockets to close when event loop is done + %OtherFds, # A hash of "other" (non-Danga::Socket) file + # descriptors for the event loop to track. + $PostLoopCallback, # subref to call at the end of each loop, if defined + ); + +%OtherFds = (); + +##################################################################### +### C L A S S M E T H O D S +##################################################################### + +### (CLASS) METHOD: HaveEpoll() +### Returns a true value if this class will use IO::Epoll for async IO. +sub HaveEpoll { $HaveEpoll }; + +### (CLASS) METHOD: WatchedSockets() +### Returns the number of file descriptors which are registered with the global +### poll object. +sub WatchedSockets { + return scalar keys %DescriptorMap; +} +*watched_sockets = *WatchedSockets; + + +### (CLASS) METHOD: ToClose() +### Return the list of sockets that are awaiting close() at the end of the +### current event loop. +sub ToClose { return @ToClose; } + + +### (CLASS) METHOD: OtherFds( [%fdmap] ) +### Get/set the hash of file descriptors that need processing in parallel with +### the registered Danga::Socket objects. +sub OtherFds { + my $class = shift; + if ( @_ ) { %OtherFds = @_ } + return wantarray ? %OtherFds : \%OtherFds; +} + + +### (CLASS) METHOD: DescriptorMap() +### Get the hash of Danga::Socket objects keyed by the file descriptor they are +### wrapping. +sub DescriptorMap { + return wantarray ? %DescriptorMap : \%DescriptorMap; +} +*descriptor_map = *DescriptorMap; +*get_sock_ref = *DescriptorMap; + +sub init_poller +{ + return if defined $HaveEpoll || $HaveKQueue; + + if ($HAVE_KQUEUE) { + $KQueue = IO::KQueue->new(); + $HaveKQueue = $KQueue >= 0; + if ($HaveKQueue) { + *EventLoop = *KQueueEventLoop; + } + } + else { + $Epoll = eval { epoll_create(1024); }; + $HaveEpoll = $Epoll >= 0; + if ($HaveEpoll) { + *EventLoop = *EpollEventLoop; + } + } + + if (!$HaveEpoll && !$HaveKQueue) { + require IO::Poll; + *EventLoop = *PollEventLoop; + } +} + +### FUNCTION: EventLoop() +### Start processing IO events. +sub EventLoop { + my $class = shift; + + init_poller(); + + if ($HaveEpoll) { + EpollEventLoop($class); + } else { + PollEventLoop($class); + } +} + +### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works +### okay. +sub KQueueEventLoop { + my $class = shift; + + foreach my $fd (keys %OtherFds) { + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), IO::KQueue::EV_ADD()); + } + + while (1) { + my @ret = $KQueue->kevent(1000); + + if (!@ret) { + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + if ($sock->can('ticker')) { + $sock->ticker; + } + } + } + + my @objs; + + foreach my $kev (@ret) { + my ($fd, $filter, $flags, $fflags) = @$kev; + + my Danga::Socket $pob = $DescriptorMap{$fd}; + + # prioritise OtherFds first - likely to be accept() socks (?) + if (!$pob) { + if (my $code = $OtherFds{$fd}) { + $code->($filter); + } + next; + } + + push @objs, [$pob, $fd, $filter, $flags, $fflags]; + } + + # TODO - prioritize the objects + + foreach (@objs) { + my ($pob, $fd, $filter, $flags, $fflags) = @$_; + + DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", + $fd, ref($pob), $flags, time); + + $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed}; + $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed}; + if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) { + if ($fflags) { + $pob->event_err; + } else { + $pob->event_hup; + } + } + } + + return unless PostEventLoop(); + } + + exit(0); +} + +### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads +### okay. +sub EpollEventLoop { + my $class = shift; + + foreach my $fd ( keys %OtherFds ) { + epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, EPOLLIN); + } + + while (1) { + my @events; + my $i; + my $evcount; + # get up to 1000 events, 1000ms timeout + while ($evcount = epoll_wait($Epoll, 1000, 1000, \@events)) { + EVENT: + for ($i=0; $i<$evcount; $i++) { + my $ev = $events[$i]; + + # it's possible epoll_wait returned many events, including some at the end + # that ones in the front triggered unregister-interest actions. if we + # can't find the %sock entry, it's because we're no longer interested + # in that event. + my Danga::Socket $pob = $DescriptorMap{$ev->[0]}; + my $code; + my $state = $ev->[1]; + + # if we didn't find a Perlbal::Socket subclass for that fd, try other + # pseudo-registered (above) fds. + if (! $pob) { + if (my $code = $OtherFds{$ev->[0]}) { + $code->($state); + } + next; + } + + DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", + $ev->[0], ref($pob), $ev->[1], time); + + $pob->event_read if $state & EPOLLIN && ! $pob->{closed}; + $pob->event_write if $state & EPOLLOUT && ! $pob->{closed}; + $pob->event_err if $state & EPOLLERR && ! $pob->{closed}; + $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed}; + } + return unless PostEventLoop(); + + } + + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + if ($sock->can('ticker')) { + $sock->ticker; + } + } + + print STDERR "Event loop ending; restarting.\n"; + } + exit 0; +} + +sub PostEventLoop { + # fire read events for objects with pushed-back read data + my $loop = 1; + while ($loop) { + $loop = 0; + foreach my $fd (keys %PushBackSet) { + my Danga::Socket $pob = $PushBackSet{$fd}; + next unless (! $pob->{closed} && + $pob->{event_watch} & POLLIN); + $loop = 1; + $pob->event_read; + } + } + + # now we can close sockets that wanted to close during our event processing. + # (we didn't want to close them during the loop, as we didn't want fd numbers + # being reused and confused during the event loop) + $_->close while ($_ = shift @ToClose); + + # now we're at the very end, call callback if defined + if (defined $PostLoopCallback) { + return $PostLoopCallback->(\%DescriptorMap, \%OtherFds); + } + return 1; +} + +### The fallback IO::Poll-based event loop. Gets installed as EventLoop if +### IO::Epoll fails to load. +sub PollEventLoop { + my $class = shift; + + my Danga::Socket $pob; + + while (1) { + # the following sets up @poll as a series of ($poll,$event_mask) + # items, then uses IO::Poll::_poll, implemented in XS, which + # modifies the array in place with the even elements being + # replaced with the event masks that occured. + my @poll; + foreach my $fd ( keys %OtherFds ) { + push @poll, $fd, POLLIN; + } + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + push @poll, $fd, $sock->{event_watch}; + } + return 0 unless @poll; + + my $count = IO::Poll::_poll(1000, @poll); + if (!$count) { + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + if ($sock->can('ticker')) { + $sock->ticker; + } + } + next; + } + + # Fetch handles with read events + while (@poll) { + my ($fd, $state) = splice(@poll, 0, 2); + next unless $state; + + $pob = $DescriptorMap{$fd}; + + if ( !$pob && (my $code = $OtherFds{$fd}) ) { + $code->($state); + next; + } + + $pob->event_read if $state & POLLIN && ! $pob->{closed}; + $pob->event_write if $state & POLLOUT && ! $pob->{closed}; + $pob->event_err if $state & POLLERR && ! $pob->{closed}; + $pob->event_hup if $state & POLLHUP && ! $pob->{closed}; + } + + return unless PostEventLoop(); + } + + exit 0; +} + + +### (CLASS) METHOD: DebugMsg( $format, @args ) +### Print the debugging message specified by the C-style I and +### I +sub DebugMsg { + my ( $class, $fmt, @args ) = @_; + chomp $fmt; + printf STDERR ">>> $fmt\n", @args; +} + + +### METHOD: new( $socket ) +### Create a new Danga::Socket object for the given I which will react +### to events on it during the C. +sub new { + my Danga::Socket $self = shift; + $self = fields::new($self) unless ref $self; + + my $sock = shift; + + $self->{sock} = $sock; + my $fd = fileno($sock); + $self->{fd} = $fd; + $self->{write_buf} = []; + $self->{write_buf_offset} = 0; + $self->{write_buf_size} = 0; + $self->{closed} = 0; + $self->{read_push_back} = []; + + $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; + + init_poller(); + + if ($HaveEpoll) { + epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch}) + and die "couldn't add epoll watch for $fd\n"; + } + elsif ($HaveKQueue) { + # Add them to the queue but disabled for now + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), + IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), + IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); + } + + $DescriptorMap{$fd} = $self; + return $self; +} + + + +##################################################################### +### I N S T A N C E M E T H O D S +##################################################################### + +### METHOD: tcp_cork( $boolean ) +### Turn TCP_CORK on or off depending on the value of I. +sub tcp_cork { + my Danga::Socket $self = shift; + my $val = shift; + + # FIXME: Linux-specific. + setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK, + pack("l", $val ? 1 : 0)) || die "setsockopt: $!"; +} + +### METHOD: close( [$reason] ) +### Close the socket. The I argument will be used in debugging messages. +sub close { + my Danga::Socket $self = shift; + my $reason = shift || ""; + + my $fd = $self->{fd}; + my $sock = $self->{sock}; + $self->{closed} = 1; + + # we need to flush our write buffer, as there may + # be self-referential closures (sub { $client->close }) + # preventing the object from being destroyed + $self->{write_buf} = []; + + if (DebugLevel) { + my ($pkg, $filename, $line) = caller; + print STDERR "Closing \#$fd due to $pkg/$filename/$line ($reason)\n"; + } + + if ($HaveEpoll) { + if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) == 0) { + DebugLevel >= 1 && $self->debugmsg("Client %d disconnected.\n", $fd); + } else { + DebugLevel >= 1 && $self->debugmsg("poll->remove failed on fd %d\n", $fd); + } + } + + delete $DescriptorMap{$fd}; + delete $PushBackSet{$fd}; + + # defer closing the actual socket until the event loop is done + # processing this round of events. (otherwise we might reuse fds) + push @ToClose, $sock; + + return 0; +} + + + +### METHOD: sock() +### Returns the underlying IO::Handle for the object. +sub sock { + my Danga::Socket $self = shift; + return $self->{sock}; +} + + +### METHOD: write( $data ) +### Write the specified data to the underlying handle. I may be scalar, +### scalar ref, code ref (to run when there), or undef just to kick-start. +### Returns 1 if writes all went through, or 0 if there are writes in queue. If +### it returns 1, caller should stop waiting for 'writable' events) +sub write { + my Danga::Socket $self; + my $data; + ($self, $data) = @_; + + # nobody should be writing to closed sockets, but caller code can + # do two writes within an event, have the first fail and + # disconnect the other side (whose destructor then closes the + # calling object, but it's still in a method), and then the + # now-dead object does its second write. that is this case. we + # just lie and say it worked. it'll be dead soon and won't be + # hurt by this lie. + return 1 if $self->{closed}; + + my $bref; + + # just queue data if there's already a wait + my $need_queue; + + if (defined $data) { + $bref = ref $data ? $data : \$data; + if ($self->{write_buf_size}) { + push @{$self->{write_buf}}, $bref; + $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1; + return 0; + } + + # this flag says we're bypassing the queue system, knowing we're the + # only outstanding write, and hoping we don't ever need to use it. + # if so later, though, we'll need to queue + $need_queue = 1; + } + + WRITE: + while (1) { + return 1 unless $bref ||= $self->{write_buf}[0]; + + my $len; + eval { + $len = length($$bref); # this will die if $bref is a code ref, caught below + }; + if ($@) { + if (ref $bref eq "CODE") { + unless ($need_queue) { + $self->{write_buf_size}--; # code refs are worth 1 + shift @{$self->{write_buf}}; + } + $bref->(); + undef $bref; + next WRITE; + } + die "Write error: $@ <$bref>"; + } + + my $to_write = $len - $self->{write_buf_offset}; + my $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset}); + + if (! defined $written) { + if ($! == EPIPE) { + return $self->close("EPIPE"); + } elsif ($! == EAGAIN) { + # since connection has stuff to write, it should now be + # interested in pending writes: + if ($need_queue) { + push @{$self->{write_buf}}, $bref; + $self->{write_buf_size} += $len; + } + $self->watch_write(1); + return 0; + } elsif ($! == ECONNRESET) { + return $self->close("ECONNRESET"); + } + + DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n"); + + return $self->close("write_error"); + } elsif ($written != $to_write) { + DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", + $written, $self->{fd}); + if ($need_queue) { + push @{$self->{write_buf}}, $bref; + $self->{write_buf_size} += $len; + } + # since connection has stuff to write, it should now be + # interested in pending writes: + $self->{write_buf_offset} += $written; + $self->{write_buf_size} -= $written; + $self->watch_write(1); + return 0; + } elsif ($written == $to_write) { + DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", + $written, $self->{fd}, $need_queue); + $self->{write_buf_offset} = 0; + + # this was our only write, so we can return immediately + # since we avoided incrementing the buffer size or + # putting it in the buffer. we also know there + # can't be anything else to write. + return 1 if $need_queue; + + $self->{write_buf_size} -= $written; + shift @{$self->{write_buf}}; + undef $bref; + next WRITE; + } + } +} + +### METHOD: push_back_read( $buf ) +### Push back I (a scalar or scalarref) into the read stream +sub push_back_read { + my Danga::Socket $self = shift; + my $buf = shift; + push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf; + $PushBackSet{$self->{fd}} = $self; +} + +### METHOD: read( $bytecount ) +### Read at most I bytes from the underlying handle; returns scalar +### ref on read, or undef on connection closed. +sub read { + my Danga::Socket $self = shift; + my $bytes = shift; + my $buf; + my $sock = $self->{sock}; + + if (@{$self->{read_push_back}}) { + $buf = shift @{$self->{read_push_back}}; + my $len = length($$buf); + if ($len <= $buf) { + unless (@{$self->{read_push_back}}) { + delete $PushBackSet{$self->{fd}}; + } + return $buf; + } else { + # if the pushed back read is too big, we have to split it + my $overflow = substr($$buf, $bytes); + $buf = substr($$buf, 0, $bytes); + unshift @{$self->{read_push_back}}, \$overflow, + return \$buf; + } + } + + my $res = sysread($sock, $buf, $bytes, 0); + DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!); + + if (! $res && $! != EWOULDBLOCK) { + # catches 0=conn closed or undef=error + DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd}); + return undef; + } + + return \$buf; +} + + +### (VIRTUAL) METHOD: event_read() +### Readable event handler. Concrete deriviatives of Danga::Socket should +### provide an implementation of this. The default implementation will die if +### called. +sub event_read { die "Base class event_read called for $_[0]\n"; } + + +### (VIRTUAL) METHOD: event_err() +### Error event handler. Concrete deriviatives of Danga::Socket should +### provide an implementation of this. The default implementation will die if +### called. +sub event_err { die "Base class event_err called for $_[0]\n"; } + + +### (VIRTUAL) METHOD: event_hup() +### 'Hangup' event handler. Concrete deriviatives of Danga::Socket should +### provide an implementation of this. The default implementation will die if +### called. +sub event_hup { die "Base class event_hup called for $_[0]\n"; } + + +### METHOD: event_write() +### Writable event handler. Concrete deriviatives of Danga::Socket may wish to +### provide an implementation of this. The default implementation calls +### C with an C. +sub event_write { + my $self = shift; + $self->write(undef); +} + + +### METHOD: watch_read( $boolean ) +### Turn 'readable' event notification on or off. +sub watch_read { + my Danga::Socket $self = shift; + return if $self->{closed}; + + my $val = shift; + my $event = $self->{event_watch}; + + $event &= ~POLLIN if ! $val; + $event |= POLLIN if $val; + + # If it changed, set it + if ($event != $self->{event_watch}) { + if ($HaveKQueue) { + $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(), + $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); + } + elsif ($HaveEpoll) { + epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) + and print STDERR "couldn't modify epoll settings for $self->{fd} " . + "($self) from $self->{event_watch} -> $event\n"; + } + $self->{event_watch} = $event; + } +} + +### METHOD: watch_read( $boolean ) +### Turn 'writable' event notification on or off. +sub watch_write { + my Danga::Socket $self = shift; + return if $self->{closed}; + + my $val = shift; + my $event = $self->{event_watch}; + + $event &= ~POLLOUT if ! $val; + $event |= POLLOUT if $val; + + # If it changed, set it + if ($event != $self->{event_watch}) { + if ($HaveKQueue) { + $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(), + $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); + } + elsif ($HaveEpoll) { + epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) + and print STDERR "couldn't modify epoll settings for $self->{fd} " . + "($self) from $self->{event_watch} -> $event\n"; + } + $self->{event_watch} = $event; + } +} + + +### METHOD: debugmsg( $format, @args ) +### Print the debugging message specified by the C-style I and +### I if the object's C is greater than or equal to the given +### I. +sub debugmsg { + my ( $self, $fmt, @args ) = @_; + confess "Not an object" unless ref $self; + + chomp $fmt; + printf STDERR ">>> $fmt\n", @args; +} + + +### METHOD: peer_ip_string() +### Returns the string describing the peer's IP +sub peer_ip_string { + my Danga::Socket $self = shift; + my $pn = getpeername($self->{sock}) or return undef; + my ($port, $iaddr) = Socket::sockaddr_in($pn); + return Socket::inet_ntoa($iaddr); +} + +### METHOD: peer_addr_string() +### Returns the string describing the peer for the socket which underlies this +### object in form "ip:port" +sub peer_addr_string { + my Danga::Socket $self = shift; + my $pn = getpeername($self->{sock}) or return undef; + my ($port, $iaddr) = Socket::sockaddr_in($pn); + return Socket::inet_ntoa($iaddr) . ":$port"; +} + +### METHOD: as_string() +### Returns a string describing this socket. +sub as_string { + my Danga::Socket $self = shift; + my $ret = ref($self) . ": " . ($self->{closed} ? "closed" : "open"); + my $peer = $self->peer_addr_string; + if ($peer) { + $ret .= " to " . $self->peer_addr_string; + } + return $ret; +} + +### CLASS METHOD: SetPostLoopCallback +### Sets post loop callback function. Pass a subref and it will be +### called every time the event loop finishes. Return 1 from the sub +### to make the loop continue, else it will exit. The function will +### be passed two parameters: \%DescriptorMap, \%OtherFds. +sub SetPostLoopCallback { + my ($class, $ref) = @_; + $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; +} + +##################################################################### +### U T I L I T Y F U N C T I O N S +##################################################################### + +our $SYS_epoll_create = eval { &SYS_epoll_create } || 254; # linux-ix86 default + +# epoll_create wrapper +# ARGS: (size) +sub epoll_create { + my $epfd = eval { syscall($SYS_epoll_create, $_[0]) }; + return -1 if $@; + return $epfd; +} + +# epoll_ctl wrapper +# ARGS: (epfd, op, fd, events) +our $SYS_epoll_ctl = eval { &SYS_epoll_ctl } || 255; # linux-ix86 default +sub epoll_ctl { + syscall($SYS_epoll_ctl, $_[0]+0, $_[1]+0, $_[2]+0, pack("LLL", $_[3], $_[2])); +} + +# epoll_wait wrapper +# ARGS: (epfd, maxevents, timeout, arrayref) +# arrayref: values modified to be [$fd, $event] +our $epoll_wait_events; +our $epoll_wait_size = 0; +our $SYS_epoll_wait = eval { &SYS_epoll_wait } || 256; # linux-ix86 default +sub epoll_wait { + # resize our static buffer if requested size is bigger than we've ever done + if ($_[1] > $epoll_wait_size) { + $epoll_wait_size = $_[1]; + $epoll_wait_events = pack("LLL") x $epoll_wait_size; + } + my $ct = syscall($SYS_epoll_wait, $_[0]+0, $epoll_wait_events, $_[1]+0, $_[2]+0); + for ($_ = 0; $_ < $ct; $_++) { + @{$_[3]->[$_]}[1,0] = unpack("LL", substr($epoll_wait_events, 12*$_, 8)); + } + return $ct; +} + + + +1; + + +# Local Variables: +# mode: perl +# c-basic-indent: 4 +# indent-tabs-mode: nil +# End: diff --git a/lib/Danga/TimeoutSocket.pm b/lib/Danga/TimeoutSocket.pm new file mode 100644 index 0000000..fe74cd9 --- /dev/null +++ b/lib/Danga/TimeoutSocket.pm @@ -0,0 +1,49 @@ +# $Id: TimeoutSocket.pm,v 1.2 2005/02/02 20:44:35 msergeant Exp $ + +package Danga::TimeoutSocket; + +use base 'Danga::Socket'; +use fields qw(alive_time create_time); + +our $last_cleanup = 0; + +sub new { + my Danga::TimeoutSocket $self = shift; + my $sock = shift; + $self = fields::new($self) unless ref($self); + $self->SUPER::new($sock); + + my $now = time; + $self->{alive_time} = $self->{create_time} = $now; + + if ($now - 15 > $last_cleanup) { + $last_cleanup = $now; + _do_cleanup($now); + } + + return $self; +} + +sub _do_cleanup { + my $now = shift; + my $sf = __PACKAGE__->get_sock_ref; + + my %max_age; # classname -> max age (0 means forever) + my @to_close; + while (my $k = each %$sf) { + my Danga::TimeoutSocket $v = $sf->{$k}; + my $ref = ref $v; + next unless $v->isa('Danga::TimeoutSocket'); + unless (defined $max_age{$ref}) { + $max_age{$ref} = $ref->max_idle_time || 0; + } + next unless $max_age{$ref}; + if ($v->{alive_time} < $now - $max_age{$ref}) { + push @to_close, $v; + } + } + + $_->close("Timeout") foreach @to_close; +} + +1;