From 8dad7435e57b3d05d7a9b86040d39b925768fdcb Mon Sep 17 00:00:00 2001 From: Matt Sergeant Date: Mon, 9 May 2005 13:43:40 +0000 Subject: [PATCH] Large number of patches from Brian Grossman to fix a number of bugs Implement connection timeout git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@413 958fd67b-6ff1-0310-b445-bb7760255be9 --- lib/Danga/Client.pm | 7 ++++--- lib/Danga/DNS/Resolver.pm | 3 +++ lib/Danga/Socket.pm | 34 +++++++++++++++++++++++++++++----- lib/Danga/TimeoutSocket.pm | 32 ++++++++++++++++++++++++++++---- lib/Qpsmtpd/ConfigServer.pm | 15 +++++++++++++-- lib/Qpsmtpd/Plugin.pm | 2 +- lib/Qpsmtpd/PollServer.pm | 6 +++++- plugins/check_earlytalker | 22 +++++++--------------- qpsmtpd | 12 +++++++++++- 9 files changed, 101 insertions(+), 32 deletions(-) diff --git a/lib/Danga/Client.pm b/lib/Danga/Client.pm index 7b13477..5fb002a 100644 --- a/lib/Danga/Client.pm +++ b/lib/Danga/Client.pm @@ -6,7 +6,8 @@ use fields qw(line closing disable_read can_read_mode); use Time::HiRes (); # 30 seconds max timeout! -sub max_idle_time { 30 } +sub max_idle_time { 30 } +sub max_connect_time { 1200 } sub new { my Danga::Client $self = shift; @@ -45,7 +46,7 @@ sub can_read { my Danga::Client $self = shift; my ($timeout) = @_; my $end = Time::HiRes::time() + $timeout; - warn("Calling can-read\n"); + # warn("Calling can-read\n"); $self->{can_read_mode} = 1; if (!length($self->{line})) { my $old = $self->watch_read(); @@ -61,7 +62,7 @@ sub can_read { $self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 }); return if $self->{closing}; $self->{alive_time} = time; - warn("can_read returning for '$self->{line}'\n"); + # warn("can_read returning for '$self->{line}'\n"); return 1 if length($self->{line}); return; } diff --git a/lib/Danga/DNS/Resolver.pm b/lib/Danga/DNS/Resolver.pm index 9d7a9f5..80dec78 100644 --- a/lib/Danga/DNS/Resolver.pm +++ b/lib/Danga/DNS/Resolver.pm @@ -286,6 +286,9 @@ sub event_read { #$self->{timeout}{$id} = time(); } + elsif ($err eq "NOERROR") { + $asker->run_callback($err, $query); + } elsif($err) { print("error: $err\n"); $asker->run_callback($err, $query); diff --git a/lib/Danga/Socket.pm b/lib/Danga/Socket.pm index 1f9a0fa..bb4de76 100644 --- a/lib/Danga/Socket.pm +++ b/lib/Danga/Socket.pm @@ -24,7 +24,7 @@ use vars qw{$VERSION}; $VERSION = do { my @r = (q$Revision: 1.4 $ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r }; use fields qw(sock fd write_buf write_buf_offset write_buf_size - read_push_back + read_push_back post_loop_callback closed event_watch debug_level); use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN @@ -307,9 +307,21 @@ sub PostEventLoop { # now we can close sockets that wanted to close during our event processing. # (we didn't want to close them during the loop, as we didn't want fd numbers # being reused and confused during the event loop) - $_->close while ($_ = shift @ToClose); + while(my $j = shift @ToClose) { + $j->[1]->close(); + $j->[0]->{closing} = 0; + } - # now we're at the very end, call callback if defined + + # now we're at the very end, call per-connection callbacks if defined + for my $fd (%DescriptorMap) { + my $pob = $DescriptorMap{$fd}; + if( defined $pob->{post_loop_callback} ) { + return unless $pob->{post_loop_callback}->(\%DescriptorMap, \%OtherFds); + } + } + + # now we're at the very end, call global callback if defined if (defined $PostLoopCallback) { return $PostLoopCallback->(\%DescriptorMap, \%OtherFds); } @@ -401,6 +413,7 @@ sub new { $self->{write_buf_size} = 0; $self->{closed} = 0; $self->{read_push_back} = []; + $self->{post_loop_callback} = undef; $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; @@ -472,7 +485,7 @@ sub close { # defer closing the actual socket until the event loop is done # processing this round of events. (otherwise we might reuse fds) - push @ToClose, $sock; + push @ToClose, [$self,$sock]; return 0; } @@ -785,7 +798,18 @@ sub as_string { ### be passed two parameters: \%DescriptorMap, \%OtherFds. sub SetPostLoopCallback { my ($class, $ref) = @_; - $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; + if(ref $class) { + my Danga::Socket $self = $class; + if( defined $ref && ref $ref eq 'CODE' ) { + $self->{PostLoopCallback} = $ref; + } + else { + delete $self->{PostLoopCallback}; + } + } + else { + $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; + } } ##################################################################### diff --git a/lib/Danga/TimeoutSocket.pm b/lib/Danga/TimeoutSocket.pm index fe74cd9..c9468d2 100644 --- a/lib/Danga/TimeoutSocket.pm +++ b/lib/Danga/TimeoutSocket.pm @@ -24,22 +24,46 @@ sub new { return $self; } +sub ticker { + my Danga::TimeoutSocket $self = shift; + + my $now = time; + + if ($now - 15 > $last_cleanup) { + $last_cleanup = $now; + _do_cleanup($now); + } +} + +# overload these in a subclass +sub max_idle_time { 0 } +sub max_connect_time { 0 } + sub _do_cleanup { my $now = shift; my $sf = __PACKAGE__->get_sock_ref; my %max_age; # classname -> max age (0 means forever) + my %max_connect; # classname -> max connect time my @to_close; while (my $k = each %$sf) { my Danga::TimeoutSocket $v = $sf->{$k}; my $ref = ref $v; next unless $v->isa('Danga::TimeoutSocket'); unless (defined $max_age{$ref}) { - $max_age{$ref} = $ref->max_idle_time || 0; + $max_age{$ref} = $ref->max_idle_time || 0; + $max_connect{$ref} = $ref->max_connect_time || 0; } - next unless $max_age{$ref}; - if ($v->{alive_time} < $now - $max_age{$ref}) { - push @to_close, $v; + if (my $t = $max_connect{$ref}) { + if ($v->{create_time} < $now - $t) { + push @to_close, $v; + next; + } + } + if (my $t = $max_age{$ref}) { + if ($v->{alive_time} < $now - $t) { + push @to_close, $v; + } } } diff --git a/lib/Qpsmtpd/ConfigServer.pm b/lib/Qpsmtpd/ConfigServer.pm index ff5e2b8..fd2c8a7 100644 --- a/lib/Qpsmtpd/ConfigServer.pm +++ b/lib/Qpsmtpd/ConfigServer.pm @@ -15,6 +15,7 @@ use fields qw( _transaction _test_mode _extras + other_fds ); my $PROMPT = "Enter command: "; @@ -130,6 +131,16 @@ sub cmd_pause { return "PAUSED"; } +sub cmd_continue { + my $self = shift; + + my $other_fds = $self->{other_fds}; + + $self->OtherFds( %$other_fds ); + %$other_fds = (); + return "UNPAUSED"; +} + sub cmd_status { my $self = shift; @@ -173,7 +184,7 @@ sub cmd_status { } } - $output .= "Curr Connections: $current_connections\n". + $output .= "Curr Connections: $current_connections / $::MAXconn\n". "Curr DNS Queries: $current_dns"; return $output; @@ -206,7 +217,7 @@ sub cmd_list { } } foreach my $item (@all) { - $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", @$item); + $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item); } return $list; diff --git a/lib/Qpsmtpd/Plugin.pm b/lib/Qpsmtpd/Plugin.pm index 25836a4..c5fefae 100644 --- a/lib/Qpsmtpd/Plugin.pm +++ b/lib/Qpsmtpd/Plugin.pm @@ -108,7 +108,7 @@ sub compile { } close F; - my $line = "\n#line 1 $file\n"; + my $line = "\n#line 0 $file\n"; if ($test_mode) { if (open(F, "t/plugin_tests/$plugin")) { diff --git a/lib/Qpsmtpd/PollServer.pm b/lib/Qpsmtpd/PollServer.pm index 991d5f0..5e14362 100644 --- a/lib/Qpsmtpd/PollServer.pm +++ b/lib/Qpsmtpd/PollServer.pm @@ -32,6 +32,9 @@ use Socket qw(inet_aton AF_INET CRLF); use Time::HiRes qw(time); use strict; +sub max_idle_time { 60 } +sub max_connect_time { 1200 } + sub input_sock { my $self = shift; @_ and $self->{input_sock} = shift; @@ -91,7 +94,7 @@ sub process_line { if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } local $SIG{ALRM} = sub { my ($pkg, $file, $line) = caller(); - die "ALARM: $pkg, $file, $line"; + die "ALARM: ($self->{mode}) $pkg, $file, $line"; }; my $prev = alarm(2); # must process a command in < 2 seconds eval { $self->_process_line($line) }; @@ -169,6 +172,7 @@ sub start_conversation { my ($ip, $port) = split(':', $self->peer_addr_string); $conn->remote_ip($ip); $conn->remote_port($port); + $conn->remote_info("[$ip]"); Danga::DNS->new( client => $self, # NB: Setting remote_info to the same as remote_host diff --git a/plugins/check_earlytalker b/plugins/check_earlytalker index 27f5d9c..7256e88 100644 --- a/plugins/check_earlytalker +++ b/plugins/check_earlytalker @@ -44,8 +44,6 @@ and terminating the SMTP connection. =cut -use Time::HiRes (); - use warnings; use strict; @@ -70,25 +68,19 @@ sub register { sub connect_handler { my ($self, $transaction) = @_; - my $qp = $self->qp; - my $end = Time::HiRes::time + $self->{_args}->{'wait'} ; - my $time; - for( $time = Time::HiRes::time; $time < $end && !length($qp->{line}) ; $time = Time::HiRes::time ) { - $qp->can_read($end-$time); - } - my $earlytalker = 0; - $earlytalker = 1 if $time < $end ; - - if ($earlytalker) { + + if ($self->qp->can_read($self->{_args}->{'wait'})) { $self->log(LOGNOTICE, 'remote host started talking before we said hello'); if ($self->{_args}->{'defer-reject'}) { - $self->connection->notes('earlytalker', 1); - } else { + $self->connection->notes('earlytalker', 1); + } + else { my $msg = 'Connecting host started transmitting before SMTP greeting'; return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; } - } else { + } + else { $self->log(LOGINFO, 'remote host said nothing spontaneous, proceeding'); } return DECLINED; diff --git a/qpsmtpd b/qpsmtpd index 928948e..96883ae 100755 --- a/qpsmtpd +++ b/qpsmtpd @@ -199,6 +199,7 @@ sub run_as_inetd { } sub run_as_server { + local $::MAXconn = $MAXCONN; # establish SERVER socket, bind and listen. $SERVER = IO::Socket::INET->new(LocalPort => $PORT, LocalAddr => $LOCALADDR, @@ -290,11 +291,19 @@ sub config_handler { # Accept a new connection sub accept_handler { - my $running = scalar keys %childstatus; + my $running; + if( $LineMode ) { + $running = scalar keys %childstatus; + } + else { + my $descriptors = Danga::Client->DescriptorMap; + $running = scalar keys %$descriptors; + } while ($running >= $MAXCONN) { ::log(LOGINFO,"Too many connections: $running >= $MAXCONN."); return; } + ++$running if $LineMode; # count self my $csock = $SERVER->accept(); if (!$csock) { @@ -341,6 +350,7 @@ sub accept_handler { $client->close; return; } + ::log(LOGINFO, "accepted connection $running/$MAXCONN ($num_conn/$MAXCONNIP) from $rem_ip"); } my $rc = $client->start_conversation;