From 3922235bcfcd2091f8c308408a348eb75d98c9d8 Mon Sep 17 00:00:00 2001 From: Matt Sergeant Date: Tue, 8 Mar 2005 19:59:45 +0000 Subject: [PATCH] Import Danga libraries. This is a bit evil but we'll just have to track them from the Danga project. This way we get something stable that we know works, plus nobody has to go and track down other libraries. Note that only Danga::Socket is (C) Danga. Everything else is original code and (C) Matt Sergeant. git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@387 958fd67b-6ff1-0310-b445-bb7760255be9 --- lib/Danga/Client.pm | 133 ++++++ lib/Danga/DNS.pm | 170 ++++++++ lib/Danga/DNS/Resolver.pm | 322 ++++++++++++++ lib/Danga/Socket.pm | 831 +++++++++++++++++++++++++++++++++++++ lib/Danga/TimeoutSocket.pm | 49 +++ 5 files changed, 1505 insertions(+) create mode 100644 lib/Danga/Client.pm create mode 100644 lib/Danga/DNS.pm create mode 100644 lib/Danga/DNS/Resolver.pm create mode 100644 lib/Danga/Socket.pm create mode 100644 lib/Danga/TimeoutSocket.pm diff --git a/lib/Danga/Client.pm b/lib/Danga/Client.pm new file mode 100644 index 0000000..7b13477 --- /dev/null +++ b/lib/Danga/Client.pm @@ -0,0 +1,133 @@ +# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $ + +package Danga::Client; +use base 'Danga::TimeoutSocket'; +use fields qw(line closing disable_read can_read_mode); +use Time::HiRes (); + +# 30 seconds max timeout! +sub max_idle_time { 30 } + +sub new { + my Danga::Client $self = shift; + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + + $self->reset_for_next_message; + return $self; +} + +sub reset_for_next_message { + my Danga::Client $self = shift; + $self->{line} = ''; + $self->{disable_read} = 0; + $self->{can_read_mode} = 0; + return $self; +} + +sub get_line { + my Danga::Client $self = shift; + if (!$self->have_line) { + $self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 }); + #warn("get_line PRE\n"); + $self->EventLoop(); + #warn("get_line POST\n"); + $self->watch_read(0); + } + return if $self->{closing}; + # now have a line. + $self->{alive_time} = time; + $self->{line} =~ s/^(.*?\n)//; + return $1; +} + +sub can_read { + my Danga::Client $self = shift; + my ($timeout) = @_; + my $end = Time::HiRes::time() + $timeout; + warn("Calling can-read\n"); + $self->{can_read_mode} = 1; + if (!length($self->{line})) { + my $old = $self->watch_read(); + $self->watch_read(1); + $self->SetPostLoopCallback(sub { (length($self->{line}) || + (Time::HiRes::time > $end)) ? 0 : 1 }); + #warn("get_line PRE\n"); + $self->EventLoop(); + #warn("get_line POST\n"); + $self->watch_read($old); + } + $self->{can_read_mode} = 0; + $self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 }); + return if $self->{closing}; + $self->{alive_time} = time; + warn("can_read returning for '$self->{line}'\n"); + return 1 if length($self->{line}); + return; +} + +sub have_line { + my Danga::Client $self = shift; + return 1 if $self->{closing}; + if ($self->{line} =~ /\n/) { + return 1; + } + return 0; +} + +sub event_read { + my Danga::Client $self = shift; + my $bref = $self->read(8192); + return $self->close($!) unless defined $bref; + # $self->watch_read(0); + $self->process_read_buf($bref); +} + +sub process_read_buf { + my Danga::Client $self = shift; + my $bref = shift; + $self->{line} .= $$bref; + return if $self->{can_read_mode}; + return if $::LineMode; + + while ($self->{line} =~ s/^(.*?\n)//) { + my $line = $1; + $self->{alive_time} = time; + my $resp = $self->process_line($line); + if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) } + $self->write($resp) if $resp; + $self->watch_read(0) if $self->{disable_read}; + } +} + +sub disable_read { + my Danga::Client $self = shift; + $self->{disable_read}++; + $self->watch_read(0); +} + +sub enable_read { + my Danga::Client $self = shift; + $self->{disable_read}--; + if ($self->{disable_read} <= 0) { + $self->{disable_read} = 0; + $self->watch_read(1); + } +} + +sub process_line { + my Danga::Client $self = shift; + return ''; +} + +sub close { + my Danga::Client $self = shift; + $self->{closing} = 1; + print "closing @_\n" if $::DEBUG; + $self->SUPER::close(@_); +} + +sub event_err { my Danga::Client $self = shift; $self->close("Error") } +sub event_hup { my Danga::Client $self = shift; $self->close("Disconnect (HUP)") } + +1; diff --git a/lib/Danga/DNS.pm b/lib/Danga/DNS.pm new file mode 100644 index 0000000..e57a3a4 --- /dev/null +++ b/lib/Danga/DNS.pm @@ -0,0 +1,170 @@ +# $Id: DNS.pm,v 1.12 2005/02/14 22:06:08 msergeant Exp $ + +package Danga::DNS; + +# This is the query class - it is really just an encapsulation of the +# hosts you want to query, plus the callback. All the hard work is done +# in Danga::DNS::Resolver. + +use fields qw(client hosts num_hosts callback results start); +use strict; + +use Danga::DNS::Resolver; + +my $resolver; + +sub trace { + my $level = shift; + print ("[$$] dns lookup: @_") if $::DEBUG >= $level; +} + +sub new { + my Danga::DNS $self = shift; + my %options = @_; + + $resolver ||= Danga::DNS::Resolver->new(); + + my $client = $options{client}; + $client->disable_read if $client; + + $self = fields::new($self) unless ref $self; + + $self->{hosts} = $options{hosts} ? $options{hosts} : [ $options{host} ]; + $self->{num_hosts} = scalar(@{$self->{hosts}}) || "No hosts supplied"; + $self->{client} = $client; + $self->{callback} = $options{callback} || die "No callback given"; + $self->{results} = {}; + $self->{start} = time; + + if ($options{type}) { + if ($options{type} eq 'TXT') { + if (!$resolver->query_txt($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + elsif ($options{type} eq 'A') { + if (!$resolver->query($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + elsif ($options{type} eq 'PTR') { + if (!$resolver->query($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + elsif ($options{type} eq 'MX') { + if (!$resolver->query_mx($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + else { + die "Unsupported DNS query type: $options{type}"; + } + } + else { + if (!$resolver->query($self, @{$self->{hosts}})) { + $client->watch_read(1) if $client; + return; + } + } + + return $self; +} + +sub run_callback { + my Danga::DNS $self = shift; + my ($result, $query) = @_; + $self->{results}{$query} = $result; + trace(2, "got $query => $result\n"); + eval { + $self->{callback}->($result, $query); + }; + if ($@) { + warn($@); + } +} + +sub DESTROY { + my Danga::DNS $self = shift; + my $now = time; + foreach my $host (@{$self->{hosts}}) { + if (!$self->{results}{$host}) { + print "DNS timeout (presumably) looking for $host after " . ($now - $self->{start}) . " secs\n"; + $self->{callback}->("NXDOMAIN", $host); + } + } + $self->{client}->enable_read if $self->{client}; +} + +1; + +=head1 NAME + +Danga::DNS - a DNS lookup class for the Danga::Socket framework + +=head1 SYNOPSIS + + Danga::DNS->new(%options); + +=head1 DESCRIPTION + +This module performs asynchronous DNS lookups, making use of a single UDP +socket (unlike Net::DNS's bgsend/bgread combination), and blocking reading on +a client until the response comes back (this is useful for e.g. SMTP rDNS +lookups where you want the answer before you see the next SMTP command). + +Currently this module will only perform A or PTR lookups. A rDNS (PTR) lookup +will be performed if the host matches the regexp: C. + +The lookups time out after 15 seconds. + +=head1 API + +=head2 C<< Danga::DNS->new( %options ) >> + +Create a new DNS query. You do not need to store the resulting object as this +class is all done with callbacks. + +Example: + + Danga::DNS->new( + callback => sub { print "Got result: $_[0]\n" }, + host => 'google.com', + ); + +=over 4 + +=item B<[required]> C + +The callback to call when results come in. This should be a reference to a +subroutine. The callback receives two parameters - the result of the DNS lookup +and the host that was looked up. + +=item C + +A host name to lookup. Note that if the hostname is a dotted quad of numbers then +a reverse DNS (PTR) lookup is performend. + +=item C + +An array-ref list of hosts to lookup. + +B One of either C or C is B. + +=item C + +It is possible to specify a C object (or subclass) which you wish +to disable for reading until your DNS result returns. + +=item C + +You can specify one of: I<"A">, I<"PTR"> or I<"TXT"> here. Other types may be +supported in the future. + +=back + +=cut diff --git a/lib/Danga/DNS/Resolver.pm b/lib/Danga/DNS/Resolver.pm new file mode 100644 index 0000000..ded6e37 --- /dev/null +++ b/lib/Danga/DNS/Resolver.pm @@ -0,0 +1,322 @@ +# $Id: Resolver.pm,v 1.3 2005/02/14 22:06:08 msergeant Exp $ + +package Danga::DNS::Resolver; +use base qw(Danga::Socket); + +use fields qw(res dst id_to_asker id_to_query timeout cache cache_timeout); + +use Net::DNS; +use Socket; +use strict; + +our $last_cleanup = 0; + +sub trace { + my $level = shift; + print ("$::DEBUG/$level [$$] dns lookup: @_") if $::DEBUG >= $level; +} + +sub new { + my Danga::DNS::Resolver $self = shift; + + $self = fields::new($self) unless ref $self; + + my $res = Net::DNS::Resolver->new; + + my $sock = IO::Socket::INET->new( + Proto => 'udp', + LocalAddr => $res->{'srcaddr'}, + LocalPort => ($res->{'srcport'} || undef), + ) || die "Cannot create socket: $!"; + IO::Handle::blocking($sock, 0); + + trace(2, "Using nameserver $res->{nameservers}->[0]:$res->{port}\n"); + my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton($res->{'nameservers'}->[0])); + #my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton('127.0.0.1')); + #my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton('10.2.1.20')); + + $self->{res} = $res; + $self->{dst} = $dst_sockaddr; + $self->{id_to_asker} = {}; + $self->{id_to_query} = {}; + $self->{timeout} = {}; + $self->{cache} = {}; + $self->{cache_timeout} = {}; + + $self->SUPER::new($sock); + + $self->watch_read(1); + + return $self; +} + +sub _query { + my Danga::DNS::Resolver $self = shift; + my ($asker, $host, $type, $now) = @_; + + if ($ENV{NODNS}) { + $asker->run_callback("NXDNS", $host); + return 1; + } + if (exists $self->{cache}{$type}{$host}) { + # print "CACHE HIT!\n"; + $asker->run_callback($self->{cache}{$type}{$host}, $host); + return 1; + } + + my $packet = $self->{res}->make_query_packet($host, $type); + my $packet_data = $packet->data; + + my $h = $packet->header; + my $id = $h->id; + + if (!$self->sock->send($packet_data, 0, $self->{dst})) { + return; + } + + trace(2, "Query: $host ($id)\n"); + + $self->{id_to_asker}->{$id} = $asker; + $self->{id_to_query}->{$id} = $host; + $self->{timeout}->{$id} = $now; + + return 1; +} + +sub query_txt { + my Danga::DNS::Resolver $self = shift; + my ($asker, @hosts) = @_; + + my $now = time(); + + trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve TXT: @hosts\n"); + + foreach my $host (@hosts) { + $self->_query($asker, $host, 'TXT', $now) || return; + } + + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } + + #print "+Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + + return 1; +} + +sub query_mx { + my Danga::DNS::Resolver $self = shift; + my ($asker, @hosts) = @_; + + my $now = time(); + + trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve MX: @hosts\n"); + + foreach my $host (@hosts) { + $self->_query($asker, $host, 'MX', $now) || return; + } + + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } + + #print "+Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + + return 1; +} + +sub query { + my Danga::DNS::Resolver $self = shift; + my ($asker, @hosts) = @_; + + my $now = time(); + + trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve A/PTR: @hosts\n"); + + foreach my $host (@hosts) { + $self->_query($asker, $host, 'A', $now) || return; + } + + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } + + #print "+Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + + return 1; +} + +sub ticker { + my Danga::DNS::Resolver $self = shift; + my $now = time; + # run cleanup every 5 seconds + if ($now - 5 > $last_cleanup) { + $last_cleanup = $now; + $self->_do_cleanup($now); + } +} + +sub _do_cleanup { + my Danga::DNS::Resolver $self = shift; + my $now = shift; + + my $idle = $self->max_idle_time; + + my @to_delete; + while (my ($id, $t) = each(%{$self->{timeout}})) { + if ($t < ($now - $idle)) { + push @to_delete, $id; + } + } + + foreach my $id (@to_delete) { + delete $self->{timeout}{$id}; + my $asker = delete $self->{id_to_asker}{$id}; + my $query = delete $self->{id_to_query}{$id}; + $asker->run_callback("NXDOMAIN", $query); + } + + foreach my $type ('A', 'TXT') { + @to_delete = (); + + while (my ($query, $t) = each(%{$self->{cache_timeout}{$type}})) { + if ($t < $now) { + push @to_delete, $query; + } + } + + foreach my $q (@to_delete) { + delete $self->{cache_timeout}{$type}{$q}; + delete $self->{cache}{$type}{$q}; + } + } +} + +# seconds max timeout! +sub max_idle_time { 30 } + +# Danga::DNS +sub event_err { shift->close("dns socket error") } +sub event_hup { shift->close("dns socket error") } + +sub event_read { + my Danga::DNS::Resolver $self = shift; + + while (my $packet = $self->{res}->bgread($self->sock)) { + my $err = $self->{res}->errorstring; + my $answers = 0; + my $header = $packet->header; + my $id = $header->id; + + my $asker = delete $self->{id_to_asker}->{$id}; + my $query = delete $self->{id_to_query}->{$id}; + delete $self->{timeout}{$id}; + + #print "-Pending queries: " . keys(%{$self->{id_to_asker}}) . + # " / Cache Size: " . keys(%{$self->{cache}}) . "\n"; + if (!$asker) { + trace(1, "No asker for id: $id\n"); + return; + } + + my $now = time(); + my @questions = $packet->question; + #print STDERR "response to ", $questions[0]->string, "\n"; + foreach my $rr ($packet->answer) { + # my $q = shift @questions; + if ($rr->type eq "PTR") { + my $rdns = $rr->ptrdname; + if ($query) { + # NB: Cached as an "A" lookup as there's no overlap and they + # go through the same query() function above + $self->{cache}{A}{$query} = $rdns; + $self->{cache_timeout}{A}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long + } + $asker->run_callback($rdns, $query); + } + elsif ($rr->type eq "A") { + my $ip = $rr->address; + if ($query) { + $self->{cache}{A}{$query} = $ip; + $self->{cache_timeout}{A}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long + } + $asker->run_callback($ip, $query); + } + elsif ($rr->type eq "TXT") { + my $txt = $rr->txtdata; + if ($query) { + $self->{cache}{TXT}{$query} = $txt; + $self->{cache_timeout}{TXT}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long + } + $asker->run_callback($txt, $query); + } + else { + # came back, but not a PTR or A record + $asker->run_callback("unknown", $query); + } + $answers++; + } + if (!$answers) { + if ($err eq "NXDOMAIN") { + # trace("found => NXDOMAIN\n"); + $asker->run_callback("NXDOMAIN", $query); + } + elsif ($err eq "SERVFAIL") { + # try again??? + print "SERVFAIL looking for $query (Pending: " . keys(%{$self->{id_to_asker}}) . ")\n"; + #$self->query($asker, $query); + $asker->run_callback($err, $query); + #$self->{id_to_asker}->{$id} = $asker; + #$self->{id_to_query}->{$id} = $query; + #$self->{timeout}{$id} = time(); + + } + elsif($err) { + print("error: $err\n"); + $asker->run_callback($err, $query); + } + else { + # trace("no answers\n"); + $asker->run_callback("NXDOMAIN", $query); + } + } + } +} + +use Carp qw(confess); + +sub close { + my Danga::DNS::Resolver $self = shift; + + $self->SUPER::close(shift); + confess "Danga::DNS::Resolver socket should never be closed!"; +} + +1; + +=head1 NAME + +Danga::DNS::Resolver - an asynchronous DNS resolver class + +=head1 SYNOPSIS + + my $res = Danga::DNS::Resolver->new(); + + $res->query($obj, @hosts); # $obj implements $obj->run_callback() + +=head1 DESCRIPTION + +This is a low level DNS resolver class that works within the Danga::Socket +asynchronous I/O framework. Do not attempt to use this class standalone - use +the C class instead. + +=cut diff --git a/lib/Danga/Socket.pm b/lib/Danga/Socket.pm new file mode 100644 index 0000000..e94220f --- /dev/null +++ b/lib/Danga/Socket.pm @@ -0,0 +1,831 @@ +########################################################################### + +=head1 NAME + +Danga::Socket - Event-driven async IO class + +=head1 SYNOPSIS + + use base ('Danga::Socket'); + +=head1 DESCRIPTION + +This is an abstract base class which provides the basic framework for +event-driven asynchronous IO. + +=cut + +########################################################################### + +package Danga::Socket; +use strict; + +use vars qw{$VERSION}; +$VERSION = do { my @r = (q$Revision: 1.4 $ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r }; + +use fields qw(sock fd write_buf write_buf_offset write_buf_size + read_push_back + closed event_watch debug_level); + +use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN + EPIPE EAGAIN EBADF ECONNRESET); + +use Socket qw(IPPROTO_TCP); +use Carp qw{croak confess}; + +use constant TCP_CORK => 3; # FIXME: not hard-coded (Linux-specific too) + +use constant DebugLevel => 0; + +# for epoll definitions: +our $HAVE_SYSCALL_PH = eval { require 'syscall.ph'; 1 } || eval { require 'sys/syscall.ph'; 1 }; +our $HAVE_KQUEUE = eval { require IO::KQueue; 1 }; + +# Explicitly define the poll constants, as either one set or the other won't be +# loaded. They're also badly implemented in IO::Epoll: +# The IO::Epoll module is buggy in that it doesn't export constants efficiently +# (at least as of 0.01), so doing constants ourselves saves 13% of the user CPU +# time +use constant EPOLLIN => 1; +use constant EPOLLOUT => 4; +use constant EPOLLERR => 8; +use constant EPOLLHUP => 16; +use constant EPOLL_CTL_ADD => 1; +use constant EPOLL_CTL_DEL => 2; +use constant EPOLL_CTL_MOD => 3; + +use constant POLLIN => 1; +use constant POLLOUT => 4; +use constant POLLERR => 8; +use constant POLLHUP => 16; +use constant POLLNVAL => 32; + +# keep track of active clients +our ( + $HaveEpoll, # Flag -- is epoll available? initially undefined. + $HaveKQueue, + %DescriptorMap, # fd (num) -> Danga::Socket object + %PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data) + $Epoll, # Global epoll fd (for epoll mode only) + $KQueue, # Global kqueue fd (for kqueue mode only) + @ToClose, # sockets to close when event loop is done + %OtherFds, # A hash of "other" (non-Danga::Socket) file + # descriptors for the event loop to track. + $PostLoopCallback, # subref to call at the end of each loop, if defined + ); + +%OtherFds = (); + +##################################################################### +### C L A S S M E T H O D S +##################################################################### + +### (CLASS) METHOD: HaveEpoll() +### Returns a true value if this class will use IO::Epoll for async IO. +sub HaveEpoll { $HaveEpoll }; + +### (CLASS) METHOD: WatchedSockets() +### Returns the number of file descriptors which are registered with the global +### poll object. +sub WatchedSockets { + return scalar keys %DescriptorMap; +} +*watched_sockets = *WatchedSockets; + + +### (CLASS) METHOD: ToClose() +### Return the list of sockets that are awaiting close() at the end of the +### current event loop. +sub ToClose { return @ToClose; } + + +### (CLASS) METHOD: OtherFds( [%fdmap] ) +### Get/set the hash of file descriptors that need processing in parallel with +### the registered Danga::Socket objects. +sub OtherFds { + my $class = shift; + if ( @_ ) { %OtherFds = @_ } + return wantarray ? %OtherFds : \%OtherFds; +} + + +### (CLASS) METHOD: DescriptorMap() +### Get the hash of Danga::Socket objects keyed by the file descriptor they are +### wrapping. +sub DescriptorMap { + return wantarray ? %DescriptorMap : \%DescriptorMap; +} +*descriptor_map = *DescriptorMap; +*get_sock_ref = *DescriptorMap; + +sub init_poller +{ + return if defined $HaveEpoll || $HaveKQueue; + + if ($HAVE_KQUEUE) { + $KQueue = IO::KQueue->new(); + $HaveKQueue = $KQueue >= 0; + if ($HaveKQueue) { + *EventLoop = *KQueueEventLoop; + } + } + else { + $Epoll = eval { epoll_create(1024); }; + $HaveEpoll = $Epoll >= 0; + if ($HaveEpoll) { + *EventLoop = *EpollEventLoop; + } + } + + if (!$HaveEpoll && !$HaveKQueue) { + require IO::Poll; + *EventLoop = *PollEventLoop; + } +} + +### FUNCTION: EventLoop() +### Start processing IO events. +sub EventLoop { + my $class = shift; + + init_poller(); + + if ($HaveEpoll) { + EpollEventLoop($class); + } else { + PollEventLoop($class); + } +} + +### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works +### okay. +sub KQueueEventLoop { + my $class = shift; + + foreach my $fd (keys %OtherFds) { + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), IO::KQueue::EV_ADD()); + } + + while (1) { + my @ret = $KQueue->kevent(1000); + + if (!@ret) { + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + if ($sock->can('ticker')) { + $sock->ticker; + } + } + } + + my @objs; + + foreach my $kev (@ret) { + my ($fd, $filter, $flags, $fflags) = @$kev; + + my Danga::Socket $pob = $DescriptorMap{$fd}; + + # prioritise OtherFds first - likely to be accept() socks (?) + if (!$pob) { + if (my $code = $OtherFds{$fd}) { + $code->($filter); + } + next; + } + + push @objs, [$pob, $fd, $filter, $flags, $fflags]; + } + + # TODO - prioritize the objects + + foreach (@objs) { + my ($pob, $fd, $filter, $flags, $fflags) = @$_; + + DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", + $fd, ref($pob), $flags, time); + + $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed}; + $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed}; + if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) { + if ($fflags) { + $pob->event_err; + } else { + $pob->event_hup; + } + } + } + + return unless PostEventLoop(); + } + + exit(0); +} + +### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads +### okay. +sub EpollEventLoop { + my $class = shift; + + foreach my $fd ( keys %OtherFds ) { + epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, EPOLLIN); + } + + while (1) { + my @events; + my $i; + my $evcount; + # get up to 1000 events, 1000ms timeout + while ($evcount = epoll_wait($Epoll, 1000, 1000, \@events)) { + EVENT: + for ($i=0; $i<$evcount; $i++) { + my $ev = $events[$i]; + + # it's possible epoll_wait returned many events, including some at the end + # that ones in the front triggered unregister-interest actions. if we + # can't find the %sock entry, it's because we're no longer interested + # in that event. + my Danga::Socket $pob = $DescriptorMap{$ev->[0]}; + my $code; + my $state = $ev->[1]; + + # if we didn't find a Perlbal::Socket subclass for that fd, try other + # pseudo-registered (above) fds. + if (! $pob) { + if (my $code = $OtherFds{$ev->[0]}) { + $code->($state); + } + next; + } + + DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", + $ev->[0], ref($pob), $ev->[1], time); + + $pob->event_read if $state & EPOLLIN && ! $pob->{closed}; + $pob->event_write if $state & EPOLLOUT && ! $pob->{closed}; + $pob->event_err if $state & EPOLLERR && ! $pob->{closed}; + $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed}; + } + return unless PostEventLoop(); + + } + + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + if ($sock->can('ticker')) { + $sock->ticker; + } + } + + print STDERR "Event loop ending; restarting.\n"; + } + exit 0; +} + +sub PostEventLoop { + # fire read events for objects with pushed-back read data + my $loop = 1; + while ($loop) { + $loop = 0; + foreach my $fd (keys %PushBackSet) { + my Danga::Socket $pob = $PushBackSet{$fd}; + next unless (! $pob->{closed} && + $pob->{event_watch} & POLLIN); + $loop = 1; + $pob->event_read; + } + } + + # now we can close sockets that wanted to close during our event processing. + # (we didn't want to close them during the loop, as we didn't want fd numbers + # being reused and confused during the event loop) + $_->close while ($_ = shift @ToClose); + + # now we're at the very end, call callback if defined + if (defined $PostLoopCallback) { + return $PostLoopCallback->(\%DescriptorMap, \%OtherFds); + } + return 1; +} + +### The fallback IO::Poll-based event loop. Gets installed as EventLoop if +### IO::Epoll fails to load. +sub PollEventLoop { + my $class = shift; + + my Danga::Socket $pob; + + while (1) { + # the following sets up @poll as a series of ($poll,$event_mask) + # items, then uses IO::Poll::_poll, implemented in XS, which + # modifies the array in place with the even elements being + # replaced with the event masks that occured. + my @poll; + foreach my $fd ( keys %OtherFds ) { + push @poll, $fd, POLLIN; + } + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + push @poll, $fd, $sock->{event_watch}; + } + return 0 unless @poll; + + my $count = IO::Poll::_poll(1000, @poll); + if (!$count) { + foreach my $fd ( keys %DescriptorMap ) { + my Danga::Socket $sock = $DescriptorMap{$fd}; + if ($sock->can('ticker')) { + $sock->ticker; + } + } + next; + } + + # Fetch handles with read events + while (@poll) { + my ($fd, $state) = splice(@poll, 0, 2); + next unless $state; + + $pob = $DescriptorMap{$fd}; + + if ( !$pob && (my $code = $OtherFds{$fd}) ) { + $code->($state); + next; + } + + $pob->event_read if $state & POLLIN && ! $pob->{closed}; + $pob->event_write if $state & POLLOUT && ! $pob->{closed}; + $pob->event_err if $state & POLLERR && ! $pob->{closed}; + $pob->event_hup if $state & POLLHUP && ! $pob->{closed}; + } + + return unless PostEventLoop(); + } + + exit 0; +} + + +### (CLASS) METHOD: DebugMsg( $format, @args ) +### Print the debugging message specified by the C-style I and +### I +sub DebugMsg { + my ( $class, $fmt, @args ) = @_; + chomp $fmt; + printf STDERR ">>> $fmt\n", @args; +} + + +### METHOD: new( $socket ) +### Create a new Danga::Socket object for the given I which will react +### to events on it during the C. +sub new { + my Danga::Socket $self = shift; + $self = fields::new($self) unless ref $self; + + my $sock = shift; + + $self->{sock} = $sock; + my $fd = fileno($sock); + $self->{fd} = $fd; + $self->{write_buf} = []; + $self->{write_buf_offset} = 0; + $self->{write_buf_size} = 0; + $self->{closed} = 0; + $self->{read_push_back} = []; + + $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; + + init_poller(); + + if ($HaveEpoll) { + epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch}) + and die "couldn't add epoll watch for $fd\n"; + } + elsif ($HaveKQueue) { + # Add them to the queue but disabled for now + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), + IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), + IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); + } + + $DescriptorMap{$fd} = $self; + return $self; +} + + + +##################################################################### +### I N S T A N C E M E T H O D S +##################################################################### + +### METHOD: tcp_cork( $boolean ) +### Turn TCP_CORK on or off depending on the value of I. +sub tcp_cork { + my Danga::Socket $self = shift; + my $val = shift; + + # FIXME: Linux-specific. + setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK, + pack("l", $val ? 1 : 0)) || die "setsockopt: $!"; +} + +### METHOD: close( [$reason] ) +### Close the socket. The I argument will be used in debugging messages. +sub close { + my Danga::Socket $self = shift; + my $reason = shift || ""; + + my $fd = $self->{fd}; + my $sock = $self->{sock}; + $self->{closed} = 1; + + # we need to flush our write buffer, as there may + # be self-referential closures (sub { $client->close }) + # preventing the object from being destroyed + $self->{write_buf} = []; + + if (DebugLevel) { + my ($pkg, $filename, $line) = caller; + print STDERR "Closing \#$fd due to $pkg/$filename/$line ($reason)\n"; + } + + if ($HaveEpoll) { + if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) == 0) { + DebugLevel >= 1 && $self->debugmsg("Client %d disconnected.\n", $fd); + } else { + DebugLevel >= 1 && $self->debugmsg("poll->remove failed on fd %d\n", $fd); + } + } + + delete $DescriptorMap{$fd}; + delete $PushBackSet{$fd}; + + # defer closing the actual socket until the event loop is done + # processing this round of events. (otherwise we might reuse fds) + push @ToClose, $sock; + + return 0; +} + + + +### METHOD: sock() +### Returns the underlying IO::Handle for the object. +sub sock { + my Danga::Socket $self = shift; + return $self->{sock}; +} + + +### METHOD: write( $data ) +### Write the specified data to the underlying handle. I may be scalar, +### scalar ref, code ref (to run when there), or undef just to kick-start. +### Returns 1 if writes all went through, or 0 if there are writes in queue. If +### it returns 1, caller should stop waiting for 'writable' events) +sub write { + my Danga::Socket $self; + my $data; + ($self, $data) = @_; + + # nobody should be writing to closed sockets, but caller code can + # do two writes within an event, have the first fail and + # disconnect the other side (whose destructor then closes the + # calling object, but it's still in a method), and then the + # now-dead object does its second write. that is this case. we + # just lie and say it worked. it'll be dead soon and won't be + # hurt by this lie. + return 1 if $self->{closed}; + + my $bref; + + # just queue data if there's already a wait + my $need_queue; + + if (defined $data) { + $bref = ref $data ? $data : \$data; + if ($self->{write_buf_size}) { + push @{$self->{write_buf}}, $bref; + $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1; + return 0; + } + + # this flag says we're bypassing the queue system, knowing we're the + # only outstanding write, and hoping we don't ever need to use it. + # if so later, though, we'll need to queue + $need_queue = 1; + } + + WRITE: + while (1) { + return 1 unless $bref ||= $self->{write_buf}[0]; + + my $len; + eval { + $len = length($$bref); # this will die if $bref is a code ref, caught below + }; + if ($@) { + if (ref $bref eq "CODE") { + unless ($need_queue) { + $self->{write_buf_size}--; # code refs are worth 1 + shift @{$self->{write_buf}}; + } + $bref->(); + undef $bref; + next WRITE; + } + die "Write error: $@ <$bref>"; + } + + my $to_write = $len - $self->{write_buf_offset}; + my $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset}); + + if (! defined $written) { + if ($! == EPIPE) { + return $self->close("EPIPE"); + } elsif ($! == EAGAIN) { + # since connection has stuff to write, it should now be + # interested in pending writes: + if ($need_queue) { + push @{$self->{write_buf}}, $bref; + $self->{write_buf_size} += $len; + } + $self->watch_write(1); + return 0; + } elsif ($! == ECONNRESET) { + return $self->close("ECONNRESET"); + } + + DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n"); + + return $self->close("write_error"); + } elsif ($written != $to_write) { + DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", + $written, $self->{fd}); + if ($need_queue) { + push @{$self->{write_buf}}, $bref; + $self->{write_buf_size} += $len; + } + # since connection has stuff to write, it should now be + # interested in pending writes: + $self->{write_buf_offset} += $written; + $self->{write_buf_size} -= $written; + $self->watch_write(1); + return 0; + } elsif ($written == $to_write) { + DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", + $written, $self->{fd}, $need_queue); + $self->{write_buf_offset} = 0; + + # this was our only write, so we can return immediately + # since we avoided incrementing the buffer size or + # putting it in the buffer. we also know there + # can't be anything else to write. + return 1 if $need_queue; + + $self->{write_buf_size} -= $written; + shift @{$self->{write_buf}}; + undef $bref; + next WRITE; + } + } +} + +### METHOD: push_back_read( $buf ) +### Push back I (a scalar or scalarref) into the read stream +sub push_back_read { + my Danga::Socket $self = shift; + my $buf = shift; + push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf; + $PushBackSet{$self->{fd}} = $self; +} + +### METHOD: read( $bytecount ) +### Read at most I bytes from the underlying handle; returns scalar +### ref on read, or undef on connection closed. +sub read { + my Danga::Socket $self = shift; + my $bytes = shift; + my $buf; + my $sock = $self->{sock}; + + if (@{$self->{read_push_back}}) { + $buf = shift @{$self->{read_push_back}}; + my $len = length($$buf); + if ($len <= $buf) { + unless (@{$self->{read_push_back}}) { + delete $PushBackSet{$self->{fd}}; + } + return $buf; + } else { + # if the pushed back read is too big, we have to split it + my $overflow = substr($$buf, $bytes); + $buf = substr($$buf, 0, $bytes); + unshift @{$self->{read_push_back}}, \$overflow, + return \$buf; + } + } + + my $res = sysread($sock, $buf, $bytes, 0); + DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!); + + if (! $res && $! != EWOULDBLOCK) { + # catches 0=conn closed or undef=error + DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd}); + return undef; + } + + return \$buf; +} + + +### (VIRTUAL) METHOD: event_read() +### Readable event handler. Concrete deriviatives of Danga::Socket should +### provide an implementation of this. The default implementation will die if +### called. +sub event_read { die "Base class event_read called for $_[0]\n"; } + + +### (VIRTUAL) METHOD: event_err() +### Error event handler. Concrete deriviatives of Danga::Socket should +### provide an implementation of this. The default implementation will die if +### called. +sub event_err { die "Base class event_err called for $_[0]\n"; } + + +### (VIRTUAL) METHOD: event_hup() +### 'Hangup' event handler. Concrete deriviatives of Danga::Socket should +### provide an implementation of this. The default implementation will die if +### called. +sub event_hup { die "Base class event_hup called for $_[0]\n"; } + + +### METHOD: event_write() +### Writable event handler. Concrete deriviatives of Danga::Socket may wish to +### provide an implementation of this. The default implementation calls +### C with an C. +sub event_write { + my $self = shift; + $self->write(undef); +} + + +### METHOD: watch_read( $boolean ) +### Turn 'readable' event notification on or off. +sub watch_read { + my Danga::Socket $self = shift; + return if $self->{closed}; + + my $val = shift; + my $event = $self->{event_watch}; + + $event &= ~POLLIN if ! $val; + $event |= POLLIN if $val; + + # If it changed, set it + if ($event != $self->{event_watch}) { + if ($HaveKQueue) { + $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(), + $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); + } + elsif ($HaveEpoll) { + epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) + and print STDERR "couldn't modify epoll settings for $self->{fd} " . + "($self) from $self->{event_watch} -> $event\n"; + } + $self->{event_watch} = $event; + } +} + +### METHOD: watch_read( $boolean ) +### Turn 'writable' event notification on or off. +sub watch_write { + my Danga::Socket $self = shift; + return if $self->{closed}; + + my $val = shift; + my $event = $self->{event_watch}; + + $event &= ~POLLOUT if ! $val; + $event |= POLLOUT if $val; + + # If it changed, set it + if ($event != $self->{event_watch}) { + if ($HaveKQueue) { + $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(), + $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); + } + elsif ($HaveEpoll) { + epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) + and print STDERR "couldn't modify epoll settings for $self->{fd} " . + "($self) from $self->{event_watch} -> $event\n"; + } + $self->{event_watch} = $event; + } +} + + +### METHOD: debugmsg( $format, @args ) +### Print the debugging message specified by the C-style I and +### I if the object's C is greater than or equal to the given +### I. +sub debugmsg { + my ( $self, $fmt, @args ) = @_; + confess "Not an object" unless ref $self; + + chomp $fmt; + printf STDERR ">>> $fmt\n", @args; +} + + +### METHOD: peer_ip_string() +### Returns the string describing the peer's IP +sub peer_ip_string { + my Danga::Socket $self = shift; + my $pn = getpeername($self->{sock}) or return undef; + my ($port, $iaddr) = Socket::sockaddr_in($pn); + return Socket::inet_ntoa($iaddr); +} + +### METHOD: peer_addr_string() +### Returns the string describing the peer for the socket which underlies this +### object in form "ip:port" +sub peer_addr_string { + my Danga::Socket $self = shift; + my $pn = getpeername($self->{sock}) or return undef; + my ($port, $iaddr) = Socket::sockaddr_in($pn); + return Socket::inet_ntoa($iaddr) . ":$port"; +} + +### METHOD: as_string() +### Returns a string describing this socket. +sub as_string { + my Danga::Socket $self = shift; + my $ret = ref($self) . ": " . ($self->{closed} ? "closed" : "open"); + my $peer = $self->peer_addr_string; + if ($peer) { + $ret .= " to " . $self->peer_addr_string; + } + return $ret; +} + +### CLASS METHOD: SetPostLoopCallback +### Sets post loop callback function. Pass a subref and it will be +### called every time the event loop finishes. Return 1 from the sub +### to make the loop continue, else it will exit. The function will +### be passed two parameters: \%DescriptorMap, \%OtherFds. +sub SetPostLoopCallback { + my ($class, $ref) = @_; + $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; +} + +##################################################################### +### U T I L I T Y F U N C T I O N S +##################################################################### + +our $SYS_epoll_create = eval { &SYS_epoll_create } || 254; # linux-ix86 default + +# epoll_create wrapper +# ARGS: (size) +sub epoll_create { + my $epfd = eval { syscall($SYS_epoll_create, $_[0]) }; + return -1 if $@; + return $epfd; +} + +# epoll_ctl wrapper +# ARGS: (epfd, op, fd, events) +our $SYS_epoll_ctl = eval { &SYS_epoll_ctl } || 255; # linux-ix86 default +sub epoll_ctl { + syscall($SYS_epoll_ctl, $_[0]+0, $_[1]+0, $_[2]+0, pack("LLL", $_[3], $_[2])); +} + +# epoll_wait wrapper +# ARGS: (epfd, maxevents, timeout, arrayref) +# arrayref: values modified to be [$fd, $event] +our $epoll_wait_events; +our $epoll_wait_size = 0; +our $SYS_epoll_wait = eval { &SYS_epoll_wait } || 256; # linux-ix86 default +sub epoll_wait { + # resize our static buffer if requested size is bigger than we've ever done + if ($_[1] > $epoll_wait_size) { + $epoll_wait_size = $_[1]; + $epoll_wait_events = pack("LLL") x $epoll_wait_size; + } + my $ct = syscall($SYS_epoll_wait, $_[0]+0, $epoll_wait_events, $_[1]+0, $_[2]+0); + for ($_ = 0; $_ < $ct; $_++) { + @{$_[3]->[$_]}[1,0] = unpack("LL", substr($epoll_wait_events, 12*$_, 8)); + } + return $ct; +} + + + +1; + + +# Local Variables: +# mode: perl +# c-basic-indent: 4 +# indent-tabs-mode: nil +# End: diff --git a/lib/Danga/TimeoutSocket.pm b/lib/Danga/TimeoutSocket.pm new file mode 100644 index 0000000..fe74cd9 --- /dev/null +++ b/lib/Danga/TimeoutSocket.pm @@ -0,0 +1,49 @@ +# $Id: TimeoutSocket.pm,v 1.2 2005/02/02 20:44:35 msergeant Exp $ + +package Danga::TimeoutSocket; + +use base 'Danga::Socket'; +use fields qw(alive_time create_time); + +our $last_cleanup = 0; + +sub new { + my Danga::TimeoutSocket $self = shift; + my $sock = shift; + $self = fields::new($self) unless ref($self); + $self->SUPER::new($sock); + + my $now = time; + $self->{alive_time} = $self->{create_time} = $now; + + if ($now - 15 > $last_cleanup) { + $last_cleanup = $now; + _do_cleanup($now); + } + + return $self; +} + +sub _do_cleanup { + my $now = shift; + my $sf = __PACKAGE__->get_sock_ref; + + my %max_age; # classname -> max age (0 means forever) + my @to_close; + while (my $k = each %$sf) { + my Danga::TimeoutSocket $v = $sf->{$k}; + my $ref = ref $v; + next unless $v->isa('Danga::TimeoutSocket'); + unless (defined $max_age{$ref}) { + $max_age{$ref} = $ref->max_idle_time || 0; + } + next unless $max_age{$ref}; + if ($v->{alive_time} < $now - $max_age{$ref}) { + push @to_close, $v; + } + } + + $_->close("Timeout") foreach @to_close; +} + +1;