Large number of patches from Brian Grossman to fix a number of bugs

Implement connection timeout


git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@413 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
Matt Sergeant 2005-05-09 13:43:40 +00:00
parent 12d9fa8311
commit 8dad7435e5
9 changed files with 101 additions and 32 deletions

View File

@ -7,6 +7,7 @@ use Time::HiRes ();
# 30 seconds max timeout! # 30 seconds max timeout!
sub max_idle_time { 30 } sub max_idle_time { 30 }
sub max_connect_time { 1200 }
sub new { sub new {
my Danga::Client $self = shift; my Danga::Client $self = shift;
@ -45,7 +46,7 @@ sub can_read {
my Danga::Client $self = shift; my Danga::Client $self = shift;
my ($timeout) = @_; my ($timeout) = @_;
my $end = Time::HiRes::time() + $timeout; my $end = Time::HiRes::time() + $timeout;
warn("Calling can-read\n"); # warn("Calling can-read\n");
$self->{can_read_mode} = 1; $self->{can_read_mode} = 1;
if (!length($self->{line})) { if (!length($self->{line})) {
my $old = $self->watch_read(); my $old = $self->watch_read();
@ -61,7 +62,7 @@ sub can_read {
$self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 }); $self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 });
return if $self->{closing}; return if $self->{closing};
$self->{alive_time} = time; $self->{alive_time} = time;
warn("can_read returning for '$self->{line}'\n"); # warn("can_read returning for '$self->{line}'\n");
return 1 if length($self->{line}); return 1 if length($self->{line});
return; return;
} }

View File

@ -286,6 +286,9 @@ sub event_read {
#$self->{timeout}{$id} = time(); #$self->{timeout}{$id} = time();
} }
elsif ($err eq "NOERROR") {
$asker->run_callback($err, $query);
}
elsif($err) { elsif($err) {
print("error: $err\n"); print("error: $err\n");
$asker->run_callback($err, $query); $asker->run_callback($err, $query);

View File

@ -24,7 +24,7 @@ use vars qw{$VERSION};
$VERSION = do { my @r = (q$Revision: 1.4 $ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r }; $VERSION = do { my @r = (q$Revision: 1.4 $ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r };
use fields qw(sock fd write_buf write_buf_offset write_buf_size use fields qw(sock fd write_buf write_buf_offset write_buf_size
read_push_back read_push_back post_loop_callback
closed event_watch debug_level); closed event_watch debug_level);
use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN
@ -307,9 +307,21 @@ sub PostEventLoop {
# now we can close sockets that wanted to close during our event processing. # now we can close sockets that wanted to close during our event processing.
# (we didn't want to close them during the loop, as we didn't want fd numbers # (we didn't want to close them during the loop, as we didn't want fd numbers
# being reused and confused during the event loop) # being reused and confused during the event loop)
$_->close while ($_ = shift @ToClose); while(my $j = shift @ToClose) {
$j->[1]->close();
$j->[0]->{closing} = 0;
}
# now we're at the very end, call callback if defined
# now we're at the very end, call per-connection callbacks if defined
for my $fd (%DescriptorMap) {
my $pob = $DescriptorMap{$fd};
if( defined $pob->{post_loop_callback} ) {
return unless $pob->{post_loop_callback}->(\%DescriptorMap, \%OtherFds);
}
}
# now we're at the very end, call global callback if defined
if (defined $PostLoopCallback) { if (defined $PostLoopCallback) {
return $PostLoopCallback->(\%DescriptorMap, \%OtherFds); return $PostLoopCallback->(\%DescriptorMap, \%OtherFds);
} }
@ -401,6 +413,7 @@ sub new {
$self->{write_buf_size} = 0; $self->{write_buf_size} = 0;
$self->{closed} = 0; $self->{closed} = 0;
$self->{read_push_back} = []; $self->{read_push_back} = [];
$self->{post_loop_callback} = undef;
$self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
@ -472,7 +485,7 @@ sub close {
# defer closing the actual socket until the event loop is done # defer closing the actual socket until the event loop is done
# processing this round of events. (otherwise we might reuse fds) # processing this round of events. (otherwise we might reuse fds)
push @ToClose, $sock; push @ToClose, [$self,$sock];
return 0; return 0;
} }
@ -785,7 +798,18 @@ sub as_string {
### be passed two parameters: \%DescriptorMap, \%OtherFds. ### be passed two parameters: \%DescriptorMap, \%OtherFds.
sub SetPostLoopCallback { sub SetPostLoopCallback {
my ($class, $ref) = @_; my ($class, $ref) = @_;
if(ref $class) {
my Danga::Socket $self = $class;
if( defined $ref && ref $ref eq 'CODE' ) {
$self->{PostLoopCallback} = $ref;
}
else {
delete $self->{PostLoopCallback};
}
}
else {
$PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
}
} }
##################################################################### #####################################################################

View File

@ -24,11 +24,27 @@ sub new {
return $self; return $self;
} }
sub ticker {
my Danga::TimeoutSocket $self = shift;
my $now = time;
if ($now - 15 > $last_cleanup) {
$last_cleanup = $now;
_do_cleanup($now);
}
}
# overload these in a subclass
sub max_idle_time { 0 }
sub max_connect_time { 0 }
sub _do_cleanup { sub _do_cleanup {
my $now = shift; my $now = shift;
my $sf = __PACKAGE__->get_sock_ref; my $sf = __PACKAGE__->get_sock_ref;
my %max_age; # classname -> max age (0 means forever) my %max_age; # classname -> max age (0 means forever)
my %max_connect; # classname -> max connect time
my @to_close; my @to_close;
while (my $k = each %$sf) { while (my $k = each %$sf) {
my Danga::TimeoutSocket $v = $sf->{$k}; my Danga::TimeoutSocket $v = $sf->{$k};
@ -36,10 +52,18 @@ sub _do_cleanup {
next unless $v->isa('Danga::TimeoutSocket'); next unless $v->isa('Danga::TimeoutSocket');
unless (defined $max_age{$ref}) { unless (defined $max_age{$ref}) {
$max_age{$ref} = $ref->max_idle_time || 0; $max_age{$ref} = $ref->max_idle_time || 0;
$max_connect{$ref} = $ref->max_connect_time || 0;
} }
next unless $max_age{$ref}; if (my $t = $max_connect{$ref}) {
if ($v->{alive_time} < $now - $max_age{$ref}) { if ($v->{create_time} < $now - $t) {
push @to_close, $v; push @to_close, $v;
next;
}
}
if (my $t = $max_age{$ref}) {
if ($v->{alive_time} < $now - $t) {
push @to_close, $v;
}
} }
} }

View File

@ -15,6 +15,7 @@ use fields qw(
_transaction _transaction
_test_mode _test_mode
_extras _extras
other_fds
); );
my $PROMPT = "Enter command: "; my $PROMPT = "Enter command: ";
@ -130,6 +131,16 @@ sub cmd_pause {
return "PAUSED"; return "PAUSED";
} }
sub cmd_continue {
my $self = shift;
my $other_fds = $self->{other_fds};
$self->OtherFds( %$other_fds );
%$other_fds = ();
return "UNPAUSED";
}
sub cmd_status { sub cmd_status {
my $self = shift; my $self = shift;
@ -173,7 +184,7 @@ sub cmd_status {
} }
} }
$output .= "Curr Connections: $current_connections\n". $output .= "Curr Connections: $current_connections / $::MAXconn\n".
"Curr DNS Queries: $current_dns"; "Curr DNS Queries: $current_dns";
return $output; return $output;
@ -206,7 +217,7 @@ sub cmd_list {
} }
} }
foreach my $item (@all) { foreach my $item (@all) {
$list .= sprintf("%x : %s [%s] Connected %0.2fs\n", @$item); $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item);
} }
return $list; return $list;

View File

@ -108,7 +108,7 @@ sub compile {
} }
close F; close F;
my $line = "\n#line 1 $file\n"; my $line = "\n#line 0 $file\n";
if ($test_mode) { if ($test_mode) {
if (open(F, "t/plugin_tests/$plugin")) { if (open(F, "t/plugin_tests/$plugin")) {

View File

@ -32,6 +32,9 @@ use Socket qw(inet_aton AF_INET CRLF);
use Time::HiRes qw(time); use Time::HiRes qw(time);
use strict; use strict;
sub max_idle_time { 60 }
sub max_connect_time { 1200 }
sub input_sock { sub input_sock {
my $self = shift; my $self = shift;
@_ and $self->{input_sock} = shift; @_ and $self->{input_sock} = shift;
@ -91,7 +94,7 @@ sub process_line {
if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; }
local $SIG{ALRM} = sub { local $SIG{ALRM} = sub {
my ($pkg, $file, $line) = caller(); my ($pkg, $file, $line) = caller();
die "ALARM: $pkg, $file, $line"; die "ALARM: ($self->{mode}) $pkg, $file, $line";
}; };
my $prev = alarm(2); # must process a command in < 2 seconds my $prev = alarm(2); # must process a command in < 2 seconds
eval { $self->_process_line($line) }; eval { $self->_process_line($line) };
@ -169,6 +172,7 @@ sub start_conversation {
my ($ip, $port) = split(':', $self->peer_addr_string); my ($ip, $port) = split(':', $self->peer_addr_string);
$conn->remote_ip($ip); $conn->remote_ip($ip);
$conn->remote_port($port); $conn->remote_port($port);
$conn->remote_info("[$ip]");
Danga::DNS->new( Danga::DNS->new(
client => $self, client => $self,
# NB: Setting remote_info to the same as remote_host # NB: Setting remote_info to the same as remote_host

View File

@ -44,8 +44,6 @@ and terminating the SMTP connection.
=cut =cut
use Time::HiRes ();
use warnings; use warnings;
use strict; use strict;
@ -70,25 +68,19 @@ sub register {
sub connect_handler { sub connect_handler {
my ($self, $transaction) = @_; my ($self, $transaction) = @_;
my $qp = $self->qp;
my $end = Time::HiRes::time + $self->{_args}->{'wait'} ;
my $time;
for( $time = Time::HiRes::time; $time < $end && !length($qp->{line}) ; $time = Time::HiRes::time ) {
$qp->can_read($end-$time);
}
my $earlytalker = 0;
$earlytalker = 1 if $time < $end ;
if ($earlytalker) { if ($self->qp->can_read($self->{_args}->{'wait'})) {
$self->log(LOGNOTICE, 'remote host started talking before we said hello'); $self->log(LOGNOTICE, 'remote host started talking before we said hello');
if ($self->{_args}->{'defer-reject'}) { if ($self->{_args}->{'defer-reject'}) {
$self->connection->notes('earlytalker', 1); $self->connection->notes('earlytalker', 1);
} else { }
else {
my $msg = 'Connecting host started transmitting before SMTP greeting'; my $msg = 'Connecting host started transmitting before SMTP greeting';
return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft';
} }
} else { }
else {
$self->log(LOGINFO, 'remote host said nothing spontaneous, proceeding'); $self->log(LOGINFO, 'remote host said nothing spontaneous, proceeding');
} }
return DECLINED; return DECLINED;

12
qpsmtpd
View File

@ -199,6 +199,7 @@ sub run_as_inetd {
} }
sub run_as_server { sub run_as_server {
local $::MAXconn = $MAXCONN;
# establish SERVER socket, bind and listen. # establish SERVER socket, bind and listen.
$SERVER = IO::Socket::INET->new(LocalPort => $PORT, $SERVER = IO::Socket::INET->new(LocalPort => $PORT,
LocalAddr => $LOCALADDR, LocalAddr => $LOCALADDR,
@ -290,11 +291,19 @@ sub config_handler {
# Accept a new connection # Accept a new connection
sub accept_handler { sub accept_handler {
my $running = scalar keys %childstatus; my $running;
if( $LineMode ) {
$running = scalar keys %childstatus;
}
else {
my $descriptors = Danga::Client->DescriptorMap;
$running = scalar keys %$descriptors;
}
while ($running >= $MAXCONN) { while ($running >= $MAXCONN) {
::log(LOGINFO,"Too many connections: $running >= $MAXCONN."); ::log(LOGINFO,"Too many connections: $running >= $MAXCONN.");
return; return;
} }
++$running if $LineMode; # count self
my $csock = $SERVER->accept(); my $csock = $SERVER->accept();
if (!$csock) { if (!$csock) {
@ -341,6 +350,7 @@ sub accept_handler {
$client->close; $client->close;
return; return;
} }
::log(LOGINFO, "accepted connection $running/$MAXCONN ($num_conn/$MAXCONNIP) from $rem_ip");
} }
my $rc = $client->start_conversation; my $rc = $client->start_conversation;