High perf branch merge and fixes


git-svn-id: https://svn.perl.org/qpsmtpd/trunk@497 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
Matt Sergeant 2005-07-11 19:11:11 +00:00
parent 9683016276
commit e407e8b470
5 changed files with 1585 additions and 0 deletions

147
lib/Danga/Client.pm Normal file
View File

@ -0,0 +1,147 @@
# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $
package Danga::Client;
use base 'Danga::TimeoutSocket';
use fields qw(line closing disable_read can_read_mode);
use Time::HiRes ();
# 30 seconds max timeout!
sub max_idle_time { 30 }
sub max_connect_time { 1200 }
sub new {
my Danga::Client $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ );
$self->reset_for_next_message;
return $self;
}
sub reset_for_next_message {
my Danga::Client $self = shift;
$self->{line} = '';
$self->{disable_read} = 0;
$self->{can_read_mode} = 0;
return $self;
}
sub get_line {
my Danga::Client $self = shift;
if (!$self->have_line) {
$self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 });
#warn("get_line PRE\n");
$self->EventLoop();
#warn("get_line POST\n");
$self->disable_read();
}
return if $self->{closing};
# now have a line.
$self->{alive_time} = time;
$self->{line} =~ s/^(.*?\n)//;
return $1;
}
sub can_read {
my Danga::Client $self = shift;
my ($timeout) = @_;
my $end = Time::HiRes::time() + $timeout;
# warn("Calling can-read\n");
$self->{can_read_mode} = 1;
if (!length($self->{line})) {
$self->disable_read();
# loop because any callback, not just ours, can make EventLoop return
while( !(length($self->{line}) || (Time::HiRes::time > $end)) ) {
$self->SetPostLoopCallback(sub { (length($self->{line}) ||
(Time::HiRes::time > $end)) ? 0 : 1 });
#warn("get_line PRE\n");
$self->EventLoop();
#warn("get_line POST\n");
}
$self->enable_read();
}
$self->{can_read_mode} = 0;
$self->SetPostLoopCallback(undef);
return if $self->{closing};
$self->{alive_time} = time;
# warn("can_read returning for '$self->{line}'\n");
return 1 if length($self->{line});
return;
}
sub have_line {
my Danga::Client $self = shift;
return 1 if $self->{closing};
if ($self->{line} =~ /\n/) {
return 1;
}
return 0;
}
sub event_read {
my Danga::Client $self = shift;
my $bref = $self->read(8192);
return $self->close($!) unless defined $bref;
# $self->watch_read(0);
$self->process_read_buf($bref);
}
sub process_read_buf {
my Danga::Client $self = shift;
my $bref = shift;
$self->{line} .= $$bref;
return if ! $self->readable();
return if $::LineMode;
while ($self->{line} =~ s/^(.*?\n)//) {
my $line = $1;
$self->{alive_time} = time;
my $resp = $self->process_line($line);
if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) }
$self->write($resp) if $resp;
$self->watch_read(0) if $self->{disable_read};
last if ! $self->readable();
}
if($self->have_line) {
$self->shift_back_read($self->{line});
$self->{line} = '';
}
}
sub readable {
my Danga::Client $self = shift;
return 0 if $self->{disable_read} > 0;
return 1;
}
sub disable_read {
my Danga::Client $self = shift;
$self->{disable_read}++;
$self->watch_read(0);
}
sub enable_read {
my Danga::Client $self = shift;
$self->{disable_read}--;
if ($self->{disable_read} <= 0) {
$self->{disable_read} = 0;
$self->watch_read(1);
}
}
sub process_line {
my Danga::Client $self = shift;
return '';
}
sub close {
my Danga::Client $self = shift;
$self->{closing} = 1;
print "closing @_\n" if $::DEBUG;
$self->SUPER::close(@_);
}
sub event_err { my Danga::Client $self = shift; $self->close("Error") }
sub event_hup { my Danga::Client $self = shift; $self->close("Disconnect (HUP)") }
1;

170
lib/Danga/DNS.pm Normal file
View File

@ -0,0 +1,170 @@
# $Id: DNS.pm,v 1.12 2005/02/14 22:06:08 msergeant Exp $
package Danga::DNS;
# This is the query class - it is really just an encapsulation of the
# hosts you want to query, plus the callback. All the hard work is done
# in Danga::DNS::Resolver.
use fields qw(client hosts num_hosts callback results start);
use strict;
use Danga::DNS::Resolver;
my $resolver;
sub trace {
my $level = shift;
print STDERR ("[$$] dns lookup: @_") if $::DEBUG >= $level;
}
sub new {
my Danga::DNS $self = shift;
my %options = @_;
$resolver ||= Danga::DNS::Resolver->new();
my $client = $options{client};
$client->disable_read if $client;
$self = fields::new($self) unless ref $self;
$self->{hosts} = $options{hosts} ? $options{hosts} : [ $options{host} ];
$self->{num_hosts} = scalar(@{$self->{hosts}}) || "No hosts supplied";
$self->{client} = $client;
$self->{callback} = $options{callback} || die "No callback given";
$self->{results} = {};
$self->{start} = time;
if ($options{type}) {
if ($options{type} eq 'TXT') {
if (!$resolver->query_txt($self, @{$self->{hosts}})) {
$client->enable_read() if $client;
return;
}
}
elsif ($options{type} eq 'A') {
if (!$resolver->query($self, @{$self->{hosts}})) {
$client->enable_read() if $client;
return;
}
}
elsif ($options{type} eq 'PTR') {
if (!$resolver->query($self, @{$self->{hosts}})) {
$client->enable_read() if $client;
return;
}
}
elsif ($options{type} eq 'MX') {
if (!$resolver->query_mx($self, @{$self->{hosts}})) {
$client->enable_read() if $client;
return;
}
}
else {
die "Unsupported DNS query type: $options{type}";
}
}
else {
if (!$resolver->query($self, @{$self->{hosts}})) {
$client->enable_read() if $client;
return;
}
}
return $self;
}
sub run_callback {
my Danga::DNS $self = shift;
my ($result, $query) = @_;
$self->{results}{$query} = $result;
trace(2, "got $query => $result\n");
eval {
$self->{callback}->($result, $query);
};
if ($@) {
warn($@);
}
}
sub DESTROY {
my Danga::DNS $self = shift;
my $now = time;
foreach my $host (@{$self->{hosts}}) {
if (!exists($self->{results}{$host})) {
print STDERR "DNS timeout (presumably) looking for $host after " . ($now - $self->{start}) . " secs\n";
$self->{callback}->("NXDOMAIN", $host);
}
}
$self->{client}->enable_read if $self->{client};
}
1;
=head1 NAME
Danga::DNS - a DNS lookup class for the Danga::Socket framework
=head1 SYNOPSIS
Danga::DNS->new(%options);
=head1 DESCRIPTION
This module performs asynchronous DNS lookups, making use of a single UDP
socket (unlike Net::DNS's bgsend/bgread combination), and blocking reading on
a client until the response comes back (this is useful for e.g. SMTP rDNS
lookups where you want the answer before you see the next SMTP command).
Currently this module will only perform A or PTR lookups. A rDNS (PTR) lookup
will be performed if the host matches the regexp: C</^\d+\.\d+\.\d+.\d+$/>.
The lookups time out after 15 seconds.
=head1 API
=head2 C<< Danga::DNS->new( %options ) >>
Create a new DNS query. You do not need to store the resulting object as this
class is all done with callbacks.
Example:
Danga::DNS->new(
callback => sub { print "Got result: $_[0]\n" },
host => 'google.com',
);
=over 4
=item B<[required]> C<callback>
The callback to call when results come in. This should be a reference to a
subroutine. The callback receives two parameters - the result of the DNS lookup
and the host that was looked up.
=item C<host>
A host name to lookup. Note that if the hostname is a dotted quad of numbers then
a reverse DNS (PTR) lookup is performend.
=item C<hosts>
An array-ref list of hosts to lookup.
B<NOTE:> One of either C<host> or C<hosts> is B<required>.
=item C<client>
It is possible to specify a C<Danga::Client> object (or subclass) which you wish
to disable for reading until your DNS result returns.
=item C<type>
You can specify one of: I<"A">, I<"PTR"> or I<"TXT"> here. Other types may be
supported in the future.
=back
=cut

307
lib/Danga/DNS/Resolver.pm Normal file
View File

@ -0,0 +1,307 @@
# $Id: Resolver.pm,v 1.3 2005/02/14 22:06:08 msergeant Exp $
package Danga::DNS::Resolver;
use base qw(Danga::Socket);
use fields qw(res dst id_to_asker id_to_query timeout cache cache_timeout);
use Net::DNS;
use Socket;
use strict;
our $last_cleanup = 0;
sub trace {
my $level = shift;
print ("$::DEBUG/$level [$$] dns lookup: @_") if $::DEBUG >= $level;
}
sub new {
my Danga::DNS::Resolver $self = shift;
$self = fields::new($self) unless ref $self;
my $res = Net::DNS::Resolver->new;
my $sock = IO::Socket::INET->new(
Proto => 'udp',
LocalAddr => $res->{'srcaddr'},
LocalPort => ($res->{'srcport'} || undef),
) || die "Cannot create socket: $!";
IO::Handle::blocking($sock, 0);
trace(2, "Using nameserver $res->{nameservers}->[0]:$res->{port}\n");
my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton($res->{'nameservers'}->[0]));
#my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton('127.0.0.1'));
#my $dst_sockaddr = sockaddr_in($res->{'port'}, inet_aton('10.2.1.20'));
$self->{res} = $res;
$self->{dst} = $dst_sockaddr;
$self->{id_to_asker} = {};
$self->{id_to_query} = {};
$self->{timeout} = {};
$self->{cache} = {};
$self->{cache_timeout} = {};
$self->SUPER::new($sock);
$self->watch_read(1);
$self->AddTimer(5, sub { $self->_do_cleanup });
return $self;
}
sub pending {
my Danga::DNS::Resolver $self = shift;
return keys(%{$self->{id_to_asker}});
}
sub _query {
my Danga::DNS::Resolver $self = shift;
my ($asker, $host, $type, $now) = @_;
if ($ENV{NODNS}) {
$asker->run_callback("NXDNS", $host);
return 1;
}
if (exists $self->{cache}{$type}{$host}) {
# print "CACHE HIT!\n";
$asker->run_callback($self->{cache}{$type}{$host}, $host);
return 1;
}
my $packet = $self->{res}->make_query_packet($host, $type);
my $packet_data = $packet->data;
my $h = $packet->header;
my $id = $h->id;
if (!$self->sock->send($packet_data, 0, $self->{dst})) {
return;
}
trace(2, "Query: $host ($id)\n");
$self->{id_to_asker}->{$id} = $asker;
$self->{id_to_query}->{$id} = $host;
$self->{timeout}->{$id} = $now;
return 1;
}
sub query_txt {
my Danga::DNS::Resolver $self = shift;
my ($asker, @hosts) = @_;
my $now = time();
trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve TXT: @hosts\n");
foreach my $host (@hosts) {
$self->_query($asker, $host, 'TXT', $now) || return;
}
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
return 1;
}
sub query_mx {
my Danga::DNS::Resolver $self = shift;
my ($asker, @hosts) = @_;
my $now = time();
trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve MX: @hosts\n");
foreach my $host (@hosts) {
$self->_query($asker, $host, 'MX', $now) || return;
}
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
return 1;
}
sub query {
my Danga::DNS::Resolver $self = shift;
my ($asker, @hosts) = @_;
my $now = time();
trace(2, "[" . keys(%{$self->{id_to_asker}}) . "] trying to resolve A/PTR: @hosts\n");
foreach my $host (@hosts) {
$self->_query($asker, $host, 'A', $now) || return;
}
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
return 1;
}
sub _do_cleanup {
my Danga::DNS::Resolver $self = shift;
my $now = time;
$self->AddTimer(5, sub { $self->_do_cleanup });
my $idle = $self->max_idle_time;
my @to_delete;
while (my ($id, $t) = each(%{$self->{timeout}})) {
if ($t < ($now - $idle)) {
push @to_delete, $id;
}
}
foreach my $id (@to_delete) {
delete $self->{timeout}{$id};
my $asker = delete $self->{id_to_asker}{$id};
my $query = delete $self->{id_to_query}{$id};
$asker->run_callback("NXDOMAIN", $query);
}
foreach my $type ('A', 'TXT') {
@to_delete = ();
while (my ($query, $t) = each(%{$self->{cache_timeout}{$type}})) {
if ($t < $now) {
push @to_delete, $query;
}
}
foreach my $q (@to_delete) {
delete $self->{cache_timeout}{$type}{$q};
delete $self->{cache}{$type}{$q};
}
}
}
# seconds max timeout!
sub max_idle_time { 30 }
# Danga::DNS
sub event_err { shift->close("dns socket error") }
sub event_hup { shift->close("dns socket error") }
sub event_read {
my Danga::DNS::Resolver $self = shift;
while (my $packet = $self->{res}->bgread($self->sock)) {
my $err = $self->{res}->errorstring;
my $answers = 0;
my $header = $packet->header;
my $id = $header->id;
my $asker = delete $self->{id_to_asker}->{$id};
my $query = delete $self->{id_to_query}->{$id};
delete $self->{timeout}{$id};
#print "-Pending queries: " . keys(%{$self->{id_to_asker}}) .
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
if (!$asker) {
trace(1, "No asker for id: $id\n");
return;
}
my $now = time();
my @questions = $packet->question;
#print STDERR "response to ", $questions[0]->string, "\n";
foreach my $rr ($packet->answer) {
# my $q = shift @questions;
if ($rr->type eq "PTR") {
my $rdns = $rr->ptrdname;
if ($query) {
# NB: Cached as an "A" lookup as there's no overlap and they
# go through the same query() function above
$self->{cache}{A}{$query} = $rdns;
$self->{cache_timeout}{A}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long
}
$asker->run_callback($rdns, $query);
}
elsif ($rr->type eq "A") {
my $ip = $rr->address;
if ($query) {
$self->{cache}{A}{$query} = $ip;
$self->{cache_timeout}{A}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long
}
$asker->run_callback($ip, $query);
}
elsif ($rr->type eq "TXT") {
my $txt = $rr->txtdata;
if ($query) {
$self->{cache}{TXT}{$query} = $txt;
$self->{cache_timeout}{TXT}{$query} = $now + 60; # should use $rr->ttl but that would cache for too long
}
$asker->run_callback($txt, $query);
}
else {
# came back, but not a PTR or A record
$asker->run_callback("unknown", $query);
}
$answers++;
}
if (!$answers) {
if ($err eq "NXDOMAIN") {
# trace("found => NXDOMAIN\n");
$asker->run_callback("NXDOMAIN", $query);
}
elsif ($err eq "SERVFAIL") {
# try again???
print "SERVFAIL looking for $query (Pending: " . keys(%{$self->{id_to_asker}}) . ")\n";
#$self->query($asker, $query);
$asker->run_callback($err, $query);
#$self->{id_to_asker}->{$id} = $asker;
#$self->{id_to_query}->{$id} = $query;
#$self->{timeout}{$id} = time();
}
elsif ($err eq "NOERROR") {
$asker->run_callback($err, $query);
}
elsif($err) {
print("error: $err\n");
$asker->run_callback($err, $query);
}
else {
# trace("no answers\n");
$asker->run_callback("NXDOMAIN", $query);
}
}
}
}
use Carp qw(confess);
sub close {
my Danga::DNS::Resolver $self = shift;
$self->SUPER::close(shift);
# confess "Danga::DNS::Resolver socket should never be closed!";
}
1;
=head1 NAME
Danga::DNS::Resolver - an asynchronous DNS resolver class
=head1 SYNOPSIS
my $res = Danga::DNS::Resolver->new();
$res->query($obj, @hosts); # $obj implements $obj->run_callback()
=head1 DESCRIPTION
This is a low level DNS resolver class that works within the Danga::Socket
asynchronous I/O framework. Do not attempt to use this class standalone - use
the C<Danga::DNS> class instead.
=cut

899
lib/Danga/Socket.pm Normal file
View File

@ -0,0 +1,899 @@
###########################################################################
=head1 NAME
Danga::Socket - Event-driven async IO class
=head1 SYNOPSIS
use base ('Danga::Socket');
=head1 DESCRIPTION
This is an abstract base class which provides the basic framework for
event-driven asynchronous IO.
=cut
###########################################################################
package Danga::Socket;
use strict;
use vars qw{$VERSION};
$VERSION = do { my @r = (q$Revision: 1.4 $ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r };
use fields qw(sock fd write_buf write_buf_offset write_buf_size
read_push_back post_loop_callback
peer_ip
closed event_watch debug_level);
use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN
EPIPE EAGAIN EBADF ECONNRESET);
use Socket qw(IPPROTO_TCP);
use Carp qw{croak confess};
use constant TCP_CORK => 3; # FIXME: not hard-coded (Linux-specific too)
use constant DebugLevel => 0;
# for epoll definitions:
our $HAVE_SYSCALL_PH = eval { require 'syscall.ph'; 1 } || eval { require 'sys/syscall.ph'; 1 };
our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
# Explicitly define the poll constants, as either one set or the other won't be
# loaded. They're also badly implemented in IO::Epoll:
# The IO::Epoll module is buggy in that it doesn't export constants efficiently
# (at least as of 0.01), so doing constants ourselves saves 13% of the user CPU
# time
use constant EPOLLIN => 1;
use constant EPOLLOUT => 4;
use constant EPOLLERR => 8;
use constant EPOLLHUP => 16;
use constant EPOLL_CTL_ADD => 1;
use constant EPOLL_CTL_DEL => 2;
use constant EPOLL_CTL_MOD => 3;
use constant POLLIN => 1;
use constant POLLOUT => 4;
use constant POLLERR => 8;
use constant POLLHUP => 16;
use constant POLLNVAL => 32;
# keep track of active clients
our (
$HaveEpoll, # Flag -- is epoll available? initially undefined.
$HaveKQueue,
%DescriptorMap, # fd (num) -> Danga::Socket object
%PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data)
$Epoll, # Global epoll fd (for epoll mode only)
$KQueue, # Global kqueue fd (for kqueue mode only)
@ToClose, # sockets to close when event loop is done
%OtherFds, # A hash of "other" (non-Danga::Socket) file
# descriptors for the event loop to track.
$PostLoopCallback, # subref to call at the end of each loop, if defined
%PLCMap, # fd (num) -> PostLoopCallback
@Timers, # timers
);
%OtherFds = ();
#####################################################################
### C L A S S M E T H O D S
#####################################################################
### (CLASS) METHOD: HaveEpoll()
### Returns a true value if this class will use IO::Epoll for async IO.
sub HaveEpoll { $HaveEpoll };
### (CLASS) METHOD: WatchedSockets()
### Returns the number of file descriptors which are registered with the global
### poll object.
sub WatchedSockets {
return scalar keys %DescriptorMap;
}
*watched_sockets = *WatchedSockets;
### (CLASS) METHOD: ToClose()
### Return the list of sockets that are awaiting close() at the end of the
### current event loop.
sub ToClose { return @ToClose; }
### (CLASS) METHOD: OtherFds( [%fdmap] )
### Get/set the hash of file descriptors that need processing in parallel with
### the registered Danga::Socket objects.
sub OtherFds {
my $class = shift;
if ( @_ ) { %OtherFds = @_ }
return wantarray ? %OtherFds : \%OtherFds;
}
sub AddTimer {
my $class = shift;
my ($secs, $coderef) = @_;
my $timeout = time + $secs;
if (!@Timers || ($timeout >= $Timers[-1][0])) {
push @Timers, [$timeout, $coderef];
return;
}
# Now where do we insert...
for (my $i = 0; $i < @Timers; $i++) {
if ($Timers[$i][0] > $timeout) {
splice(@Timers, $i, 0, [$timeout, $coderef]);
return;
}
}
die "Shouldn't get here spank matt.";
}
### (CLASS) METHOD: DescriptorMap()
### Get the hash of Danga::Socket objects keyed by the file descriptor they are
### wrapping.
sub DescriptorMap {
return wantarray ? %DescriptorMap : \%DescriptorMap;
}
*descriptor_map = *DescriptorMap;
*get_sock_ref = *DescriptorMap;
sub init_poller
{
return if defined $HaveEpoll || $HaveKQueue;
if ($HAVE_KQUEUE) {
$KQueue = IO::KQueue->new();
$HaveKQueue = $KQueue >= 0;
if ($HaveKQueue) {
*EventLoop = *KQueueEventLoop;
}
}
else {
$Epoll = eval { epoll_create(1024); };
$HaveEpoll = $Epoll >= 0;
if ($HaveEpoll) {
*EventLoop = *EpollEventLoop;
}
}
if (!$HaveEpoll && !$HaveKQueue) {
require IO::Poll;
*EventLoop = *PollEventLoop;
}
}
### FUNCTION: EventLoop()
### Start processing IO events.
sub EventLoop {
my $class = shift;
init_poller();
if ($HaveEpoll) {
EpollEventLoop($class);
} else {
PollEventLoop($class);
}
}
### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
### okay.
sub KQueueEventLoop {
my $class = shift;
foreach my $fd (keys %OtherFds) {
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), IO::KQueue::EV_ADD());
}
while (1) {
my $now = time;
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
my $to_run = shift(@Timers);
$to_run->[1]->($now);
}
# Get next timeout
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
# print STDERR "kevent($timeout)\n";
my @ret = $KQueue->kevent($timeout * 1000);
foreach my $kev (@ret) {
my ($fd, $filter, $flags, $fflags) = @$kev;
my Danga::Socket $pob = $DescriptorMap{$fd};
# prioritise OtherFds first - likely to be accept() socks (?)
if (!$pob) {
if (my $code = $OtherFds{$fd}) {
$code->($filter);
}
else {
print STDERR "kevent() returned fd $fd for which we have no mapping. removing.\n";
POSIX::close($fd); # close deletes the kevent entry
}
next;
}
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n",
$fd, ref($pob), $flags, time);
$pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed};
$pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed};
if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) {
if ($fflags) {
$pob->event_err;
} else {
$pob->event_hup;
}
}
}
return unless PostEventLoop();
}
exit(0);
}
### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
### okay.
sub EpollEventLoop {
my $class = shift;
foreach my $fd ( keys %OtherFds ) {
epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, EPOLLIN);
}
while (1) {
my $now = time;
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
my $to_run = shift(@Timers);
$to_run->[1]->($now);
}
# Get next timeout
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
my @events;
my $i;
my $evcount = epoll_wait($Epoll, 1000, $timeout * 1000, \@events);
EVENT:
for ($i=0; $i<$evcount; $i++) {
my $ev = $events[$i];
# it's possible epoll_wait returned many events, including some at the end
# that ones in the front triggered unregister-interest actions. if we
# can't find the %sock entry, it's because we're no longer interested
# in that event.
my Danga::Socket $pob = $DescriptorMap{$ev->[0]};
my $code;
my $state = $ev->[1];
# if we didn't find a Perlbal::Socket subclass for that fd, try other
# pseudo-registered (above) fds.
if (! $pob) {
if (my $code = $OtherFds{$ev->[0]}) {
$code->($state);
}
else {
my $fd = $ev->[0];
print STDERR "epoll() returned fd $fd w/ state $state for which we have no mapping. removing.\n";
POSIX::close($fd);
epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0);
}
next;
}
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n",
$ev->[0], ref($pob), $ev->[1], time);
$pob->event_read if $state & EPOLLIN && ! $pob->{closed};
$pob->event_write if $state & EPOLLOUT && ! $pob->{closed};
$pob->event_err if $state & EPOLLERR && ! $pob->{closed};
$pob->event_hup if $state & EPOLLHUP && ! $pob->{closed};
}
return unless PostEventLoop();
}
exit 0;
}
### The fallback IO::Poll-based event loop. Gets installed as EventLoop if
### IO::Epoll fails to load.
sub PollEventLoop {
my $class = shift;
my Danga::Socket $pob;
while (1) {
my $now = time;
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
my $to_run = shift(@Timers);
$to_run->[1]->($now);
}
# Get next timeout
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
# the following sets up @poll as a series of ($poll,$event_mask)
# items, then uses IO::Poll::_poll, implemented in XS, which
# modifies the array in place with the even elements being
# replaced with the event masks that occured.
my @poll;
foreach my $fd ( keys %OtherFds ) {
push @poll, $fd, POLLIN;
}
foreach my $fd ( keys %DescriptorMap ) {
my Danga::Socket $sock = $DescriptorMap{$fd};
push @poll, $fd, $sock->{event_watch};
}
return 0 unless @poll;
# print STDERR "Poll for $timeout secs\n";
my $count = IO::Poll::_poll($timeout * 1000, @poll);
# Fetch handles with read events
while (@poll) {
my ($fd, $state) = splice(@poll, 0, 2);
next unless $state;
$pob = $DescriptorMap{$fd};
if ( !$pob && (my $code = $OtherFds{$fd}) ) {
$code->($state);
next;
}
$pob->event_read if $state & POLLIN && ! $pob->{closed};
$pob->event_write if $state & POLLOUT && ! $pob->{closed};
$pob->event_err if $state & POLLERR && ! $pob->{closed};
$pob->event_hup if $state & POLLHUP && ! $pob->{closed};
}
return unless PostEventLoop();
}
exit 0;
}
## PostEventLoop is called at the end of the event loop to process things
# like close() calls.
sub PostEventLoop {
# fire read events for objects with pushed-back read data
my $loop = 1;
while ($loop) {
$loop = 0;
foreach my $fd (keys %PushBackSet) {
my Danga::Socket $pob = $PushBackSet{$fd};
next unless (! $pob->{closed} &&
$pob->{event_watch} & POLLIN);
$loop = 1;
$pob->event_read;
}
}
# now we can close sockets that wanted to close during our event processing.
# (we didn't want to close them during the loop, as we didn't want fd numbers
# being reused and confused during the event loop)
foreach my $f (@ToClose) {
close($f);
}
@ToClose = ();
# now we're at the very end, call per-connection callbacks if defined
my $ret = 1; # use $ret so's to not starve some FDs; return 0 if any PLCs return 0
for my $plc (values %PLCMap) {
$ret &&= $plc->(\%DescriptorMap, \%OtherFds);
}
# now we're at the very end, call global callback if defined
if (defined $PostLoopCallback) {
$ret &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds);
}
return $ret;
}
### (CLASS) METHOD: DebugMsg( $format, @args )
### Print the debugging message specified by the C<sprintf>-style I<format> and
### I<args>
sub DebugMsg {
my ( $class, $fmt, @args ) = @_;
chomp $fmt;
printf STDERR ">>> $fmt\n", @args;
}
### METHOD: new( $socket )
### Create a new Danga::Socket object for the given I<socket> which will react
### to events on it during the C<wait_loop>.
sub new {
my Danga::Socket $self = shift;
$self = fields::new($self) unless ref $self;
my $sock = shift;
$self->{sock} = $sock;
my $fd = fileno($sock);
$self->{fd} = $fd;
$self->{write_buf} = [];
$self->{write_buf_offset} = 0;
$self->{write_buf_size} = 0;
$self->{closed} = 0;
$self->{read_push_back} = [];
$self->{post_loop_callback} = undef;
$self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
init_poller();
if ($HaveEpoll) {
epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch})
and die "couldn't add epoll watch for $fd\n";
}
elsif ($HaveKQueue) {
# Add them to the queue but disabled for now
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
}
$DescriptorMap{$fd} = $self;
return $self;
}
#####################################################################
### I N S T A N C E M E T H O D S
#####################################################################
### METHOD: tcp_cork( $boolean )
### Turn TCP_CORK on or off depending on the value of I<boolean>.
sub tcp_cork {
my Danga::Socket $self = shift;
my $val = shift;
# FIXME: Linux-specific.
setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK,
pack("l", $val ? 1 : 0)) || die "setsockopt: $!";
}
### METHOD: close( [$reason] )
### Close the socket. The I<reason> argument will be used in debugging messages.
sub close {
my Danga::Socket $self = shift;
my $reason = shift || "";
my $fd = $self->{fd};
my $sock = $self->{sock};
$self->{closed} = 1;
# we need to flush our write buffer, as there may
# be self-referential closures (sub { $client->close })
# preventing the object from being destroyed
$self->{write_buf} = [];
if (DebugLevel) {
my ($pkg, $filename, $line) = caller;
print STDERR "Closing \#$fd due to $pkg/$filename/$line ($reason)\n";
}
if ($HaveEpoll) {
if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) == 0) {
DebugLevel >= 1 && $self->debugmsg("Client %d disconnected.\n", $fd);
} else {
DebugLevel >= 1 && $self->debugmsg("poll->remove failed on fd %d\n", $fd);
}
}
delete $PLCMap{$fd};
delete $DescriptorMap{$fd};
delete $PushBackSet{$fd};
# defer closing the actual socket until the event loop is done
# processing this round of events. (otherwise we might reuse fds)
push @ToClose, $sock;
return 0;
}
### METHOD: sock()
### Returns the underlying IO::Handle for the object.
sub sock {
my Danga::Socket $self = shift;
return $self->{sock};
}
### METHOD: write( $data )
### Write the specified data to the underlying handle. I<data> may be scalar,
### scalar ref, code ref (to run when there), or undef just to kick-start.
### Returns 1 if writes all went through, or 0 if there are writes in queue. If
### it returns 1, caller should stop waiting for 'writable' events)
sub write {
my Danga::Socket $self;
my $data;
($self, $data) = @_;
# nobody should be writing to closed sockets, but caller code can
# do two writes within an event, have the first fail and
# disconnect the other side (whose destructor then closes the
# calling object, but it's still in a method), and then the
# now-dead object does its second write. that is this case. we
# just lie and say it worked. it'll be dead soon and won't be
# hurt by this lie.
return 1 if $self->{closed};
my $bref;
# just queue data if there's already a wait
my $need_queue;
if (defined $data) {
$bref = ref $data ? $data : \$data;
if ($self->{write_buf_size}) {
push @{$self->{write_buf}}, $bref;
$self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1;
return 0;
}
# this flag says we're bypassing the queue system, knowing we're the
# only outstanding write, and hoping we don't ever need to use it.
# if so later, though, we'll need to queue
$need_queue = 1;
}
WRITE:
while (1) {
return 1 unless $bref ||= $self->{write_buf}[0];
my $len;
eval {
$len = length($$bref); # this will die if $bref is a code ref, caught below
};
if ($@) {
if (ref $bref eq "CODE") {
unless ($need_queue) {
$self->{write_buf_size}--; # code refs are worth 1
shift @{$self->{write_buf}};
}
$bref->();
undef $bref;
next WRITE;
}
die "Write error: $@ <$bref>";
}
my $to_write = $len - $self->{write_buf_offset};
my $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset});
if (! defined $written) {
if ($! == EPIPE) {
return $self->close("EPIPE");
} elsif ($! == EAGAIN) {
# since connection has stuff to write, it should now be
# interested in pending writes:
if ($need_queue) {
push @{$self->{write_buf}}, $bref;
$self->{write_buf_size} += $len;
}
$self->watch_write(1);
return 0;
} elsif ($! == ECONNRESET) {
return $self->close("ECONNRESET");
}
DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n");
return $self->close("write_error");
} elsif ($written != $to_write) {
DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d",
$written, $self->{fd});
if ($need_queue) {
push @{$self->{write_buf}}, $bref;
$self->{write_buf_size} += $len;
}
# since connection has stuff to write, it should now be
# interested in pending writes:
$self->{write_buf_offset} += $written;
$self->{write_buf_size} -= $written;
$self->watch_write(1);
return 0;
} elsif ($written == $to_write) {
DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)",
$written, $self->{fd}, $need_queue);
$self->{write_buf_offset} = 0;
# this was our only write, so we can return immediately
# since we avoided incrementing the buffer size or
# putting it in the buffer. we also know there
# can't be anything else to write.
return 1 if $need_queue;
$self->{write_buf_size} -= $written;
shift @{$self->{write_buf}};
undef $bref;
next WRITE;
}
}
}
### METHOD: push_back_read( $buf )
### Push back I<buf> (a scalar or scalarref) into the read stream
sub push_back_read {
my Danga::Socket $self = shift;
my $buf = shift;
push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf;
$PushBackSet{$self->{fd}} = $self;
}
### METHOD: shift_back_read( $buf )
### Shift back I<buf> (a scalar or scalarref) into the read stream
### Use this instead of push_back_read() when you need to unread
### something you just read.
sub shift_back_read {
my Danga::Socket $self = shift;
my $buf = shift;
unshift @{$self->{read_push_back}}, ref $buf ? $buf : \$buf;
$PushBackSet{$self->{fd}} = $self;
}
### METHOD: read( $bytecount )
### Read at most I<bytecount> bytes from the underlying handle; returns scalar
### ref on read, or undef on connection closed.
sub read {
my Danga::Socket $self = shift;
my $bytes = shift;
my $buf;
my $sock = $self->{sock};
if (@{$self->{read_push_back}}) {
$buf = shift @{$self->{read_push_back}};
my $len = length($$buf);
if ($len <= $buf) {
unless (@{$self->{read_push_back}}) {
delete $PushBackSet{$self->{fd}};
}
return $buf;
} else {
# if the pushed back read is too big, we have to split it
my $overflow = substr($$buf, $bytes);
$buf = substr($$buf, 0, $bytes);
unshift @{$self->{read_push_back}}, \$overflow,
return \$buf;
}
}
my $res = sysread($sock, $buf, $bytes, 0);
DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!);
if (! $res && $! != EWOULDBLOCK) {
# catches 0=conn closed or undef=error
DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd});
return undef;
}
return \$buf;
}
### (VIRTUAL) METHOD: event_read()
### Readable event handler. Concrete deriviatives of Danga::Socket should
### provide an implementation of this. The default implementation will die if
### called.
sub event_read { die "Base class event_read called for $_[0]\n"; }
### (VIRTUAL) METHOD: event_err()
### Error event handler. Concrete deriviatives of Danga::Socket should
### provide an implementation of this. The default implementation will die if
### called.
sub event_err { die "Base class event_err called for $_[0]\n"; }
### (VIRTUAL) METHOD: event_hup()
### 'Hangup' event handler. Concrete deriviatives of Danga::Socket should
### provide an implementation of this. The default implementation will die if
### called.
sub event_hup { die "Base class event_hup called for $_[0]\n"; }
### METHOD: event_write()
### Writable event handler. Concrete deriviatives of Danga::Socket may wish to
### provide an implementation of this. The default implementation calls
### C<write()> with an C<undef>.
sub event_write {
my $self = shift;
$self->write(undef);
}
### METHOD: watch_read( $boolean )
### Turn 'readable' event notification on or off.
sub watch_read {
my Danga::Socket $self = shift;
return if $self->{closed};
my $val = shift;
my $event = $self->{event_watch};
$event &= ~POLLIN if ! $val;
$event |= POLLIN if $val;
# If it changed, set it
if ($event != $self->{event_watch}) {
if ($HaveKQueue) {
$KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(),
$val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
}
elsif ($HaveEpoll) {
epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
and print STDERR "couldn't modify epoll settings for $self->{fd} " .
"($self) from $self->{event_watch} -> $event\n";
}
$self->{event_watch} = $event;
}
}
### METHOD: watch_read( $boolean )
### Turn 'writable' event notification on or off.
sub watch_write {
my Danga::Socket $self = shift;
return if $self->{closed};
my $val = shift;
my $event = $self->{event_watch};
$event &= ~POLLOUT if ! $val;
$event |= POLLOUT if $val;
# If it changed, set it
if ($event != $self->{event_watch}) {
if ($HaveKQueue) {
$KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(),
$val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
}
elsif ($HaveEpoll) {
epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
and print STDERR "couldn't modify epoll settings for $self->{fd} " .
"($self) from $self->{event_watch} -> $event\n";
}
$self->{event_watch} = $event;
}
}
### METHOD: debugmsg( $format, @args )
### Print the debugging message specified by the C<sprintf>-style I<format> and
### I<args> if the object's C<debug_level> is greater than or equal to the given
### I<level>.
sub debugmsg {
my ( $self, $fmt, @args ) = @_;
confess "Not an object" unless ref $self;
chomp $fmt;
printf STDERR ">>> $fmt\n", @args;
}
### METHOD: peer_ip_string()
### Returns the string describing the peer's IP
sub peer_ip_string {
my Danga::Socket $self = shift;
return $self->{peer_ip} if defined $self->{peer_ip};
my $pn = getpeername($self->{sock}) or return undef;
my ($port, $iaddr) = Socket::sockaddr_in($pn);
my $r = Socket::inet_ntoa($iaddr);
$self->{peer_ip} = $r;
return $r;
}
### METHOD: peer_addr_string()
### Returns the string describing the peer for the socket which underlies this
### object in form "ip:port"
sub peer_addr_string {
my Danga::Socket $self = shift;
my $pn = getpeername($self->{sock}) or return undef;
my ($port, $iaddr) = Socket::sockaddr_in($pn);
return Socket::inet_ntoa($iaddr) . ":$port";
}
### METHOD: as_string()
### Returns a string describing this socket.
sub as_string {
my Danga::Socket $self = shift;
my $ret = ref($self) . ": " . ($self->{closed} ? "closed" : "open");
my $peer = $self->peer_addr_string;
if ($peer) {
$ret .= " to " . $self->peer_addr_string;
}
return $ret;
}
### CLASS METHOD: SetPostLoopCallback
### Sets post loop callback function. Pass a subref and it will be
### called every time the event loop finishes. Return 1 from the sub
### to make the loop continue, else it will exit. The function will
### be passed two parameters: \%DescriptorMap, \%OtherFds.
sub SetPostLoopCallback {
my ($class, $ref) = @_;
if(ref $class) {
my Danga::Socket $self = $class;
if( defined $ref && ref $ref eq 'CODE' ) {
$PLCMap{$self->{fd}} = $ref;
}
else {
delete $PLCMap{$self->{fd}};
}
}
else {
$PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
}
}
sub DESTROY {
my Danga::Socket $self = shift;
$self->close() if !$self->{closed};
}
#####################################################################
### U T I L I T Y F U N C T I O N S
#####################################################################
our $SYS_epoll_create = eval { &SYS_epoll_create } || 254; # linux-ix86 default
# epoll_create wrapper
# ARGS: (size)
sub epoll_create {
my $epfd = eval { syscall($SYS_epoll_create, $_[0]) };
return -1 if $@;
return $epfd;
}
# epoll_ctl wrapper
# ARGS: (epfd, op, fd, events)
our $SYS_epoll_ctl = eval { &SYS_epoll_ctl } || 255; # linux-ix86 default
sub epoll_ctl {
syscall($SYS_epoll_ctl, $_[0]+0, $_[1]+0, $_[2]+0, pack("LLL", $_[3], $_[2]));
}
# epoll_wait wrapper
# ARGS: (epfd, maxevents, timeout, arrayref)
# arrayref: values modified to be [$fd, $event]
our $epoll_wait_events;
our $epoll_wait_size = 0;
our $SYS_epoll_wait = eval { &SYS_epoll_wait } || 256; # linux-ix86 default
sub epoll_wait {
# resize our static buffer if requested size is bigger than we've ever done
if ($_[1] > $epoll_wait_size) {
$epoll_wait_size = $_[1];
$epoll_wait_events = pack("LLL") x $epoll_wait_size;
}
my $ct = syscall($SYS_epoll_wait, $_[0]+0, $epoll_wait_events, $_[1]+0, $_[2]+0);
for ($_ = 0; $_ < $ct; $_++) {
@{$_[3]->[$_]}[1,0] = unpack("LL", substr($epoll_wait_events, 12*$_, 8));
}
return $ct;
}
1;
# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:

View File

@ -0,0 +1,62 @@
# $Id: TimeoutSocket.pm,v 1.2 2005/02/02 20:44:35 msergeant Exp $
package Danga::TimeoutSocket;
use base 'Danga::Socket';
use fields qw(alive_time create_time);
our $last_cleanup = 0;
Danga::Socket->AddTimer(15, \&_do_cleanup);
sub new {
my Danga::TimeoutSocket $self = shift;
my $sock = shift;
$self = fields::new($self) unless ref($self);
$self->SUPER::new($sock);
my $now = time;
$self->{alive_time} = $self->{create_time} = $now;
return $self;
}
# overload these in a subclass
sub max_idle_time { 0 }
sub max_connect_time { 0 }
sub _do_cleanup {
my $now = time;
Danga::Socket->AddTimer(15, \&_do_cleanup);
my $sf = __PACKAGE__->get_sock_ref;
my %max_age; # classname -> max age (0 means forever)
my %max_connect; # classname -> max connect time
my @to_close;
while (my $k = each %$sf) {
my Danga::TimeoutSocket $v = $sf->{$k};
my $ref = ref $v;
next unless $v->isa('Danga::TimeoutSocket');
unless (defined $max_age{$ref}) {
$max_age{$ref} = $ref->max_idle_time || 0;
$max_connect{$ref} = $ref->max_connect_time || 0;
}
if (my $t = $max_connect{$ref}) {
if ($v->{create_time} < $now - $t) {
push @to_close, $v;
next;
}
}
if (my $t = $max_age{$ref}) {
if ($v->{alive_time} < $now - $t) {
push @to_close, $v;
}
}
}
$_->close("Timeout") foreach @to_close;
}
1;