Timer support added to Danga::Socket

check_earlytalker updated to use timers
Few other code cleanups to make sure check-earlytalker is fully working


git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@441 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
Matt Sergeant 2005-06-21 20:02:14 +00:00
parent a4517bdfa4
commit cb047d9aa9
6 changed files with 104 additions and 33 deletions

View File

@ -33,7 +33,7 @@ sub get_line {
#warn("get_line PRE\n");
$self->EventLoop();
#warn("get_line POST\n");
$self->watch_read(0);
$self->disable_read();
}
return if $self->{closing};
# now have a line.
@ -49,8 +49,7 @@ sub can_read {
# warn("Calling can-read\n");
$self->{can_read_mode} = 1;
if (!length($self->{line})) {
my $old = $self->watch_read();
$self->watch_read(1);
$self->disable_read();
# loop because any callback, not just ours, can make EventLoop return
while( !(length($self->{line}) || (Time::HiRes::time > $end)) ) {
$self->SetPostLoopCallback(sub { (length($self->{line}) ||
@ -58,8 +57,8 @@ sub can_read {
#warn("get_line PRE\n");
$self->EventLoop();
#warn("get_line POST\n");
}
$self->watch_read($old);
}
$self->enable_read();
}
$self->{can_read_mode} = 0;
$self->SetPostLoopCallback(undef);

View File

@ -39,25 +39,25 @@ sub new {
if ($options{type}) {
if ($options{type} eq 'TXT') {
if (!$resolver->query_txt($self, @{$self->{hosts}})) {
$client->watch_read(1) if $client;
$client->enable_read() if $client;
return;
}
}
elsif ($options{type} eq 'A') {
if (!$resolver->query($self, @{$self->{hosts}})) {
$client->watch_read(1) if $client;
$client->enable_read() if $client;
return;
}
}
elsif ($options{type} eq 'PTR') {
if (!$resolver->query($self, @{$self->{hosts}})) {
$client->watch_read(1) if $client;
$client->enable_read() if $client;
return;
}
}
elsif ($options{type} eq 'MX') {
if (!$resolver->query_mx($self, @{$self->{hosts}})) {
$client->watch_read(1) if $client;
$client->enable_read() if $client;
return;
}
}
@ -67,7 +67,7 @@ sub new {
}
else {
if (!$resolver->query($self, @{$self->{hosts}})) {
$client->watch_read(1) if $client;
$client->enable_read() if $client;
return;
}
}

View File

@ -74,6 +74,7 @@ our (
# descriptors for the event loop to track.
$PostLoopCallback, # subref to call at the end of each loop, if defined
%PLCMap, # fd (num) -> PostLoopCallback
@Timers, # timers
);
%OtherFds = ();
@ -110,6 +111,30 @@ sub OtherFds {
return wantarray ? %OtherFds : \%OtherFds;
}
sub AddTimer {
my $class = shift;
my ($secs, $coderef) = @_;
my $timeout = time + $secs;
use Data::Dumper; $Data::Dumper::Indent=1;
if (!@Timers || ($timeout > $Timers[-1][0])) {
push @Timers, [$timeout, $coderef];
print STDERR Dumper(\@Timers);
return;
}
# Now where do we insert...
for (my $i = 0; $i < @Timers; $i++) {
if ($Timers[$i][0] > $timeout) {
splice(@Timers, $i, 0, [$timeout, $coderef]);
print STDERR Dumper(\@Timers);
return;
}
}
die "Shouldn't get here spank matt.";
}
### (CLASS) METHOD: DescriptorMap()
### Get the hash of Danga::Socket objects keyed by the file descriptor they are
@ -169,7 +194,16 @@ sub KQueueEventLoop {
}
while (1) {
my @ret = $KQueue->kevent(1000);
my $now = time;
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
my $to_run = shift(@Timers);
$to_run->[1]->($now);
}
# Get next timeout
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
my @ret = $KQueue->kevent($timeout * 1000);
if (!@ret) {
foreach my $fd ( keys %DescriptorMap ) {
@ -233,11 +267,21 @@ sub EpollEventLoop {
}
while (1) {
my $now = time;
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
my $to_run = shift(@Timers);
$to_run->[1]->($now);
}
# Get next timeout
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
my @events;
my $i;
my $evcount;
# get up to 1000 events, 1000ms timeout
while ($evcount = epoll_wait($Epoll, 1000, 1000, \@events)) {
while ($evcount = epoll_wait($Epoll, 1000, $timeout * 1000, \@events)) {
my @objs;
EVENT:
for ($i=0; $i<$evcount; $i++) {
@ -300,6 +344,16 @@ sub PollEventLoop {
my Danga::Socket $pob;
while (1) {
my $now = time;
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
my $to_run = shift(@Timers);
$to_run->[1]->($now);
}
# Get next timeout
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
# the following sets up @poll as a series of ($poll,$event_mask)
# items, then uses IO::Poll::_poll, implemented in XS, which
# modifies the array in place with the even elements being
@ -314,7 +368,7 @@ sub PollEventLoop {
}
return 0 unless @poll;
my $count = IO::Poll::_poll(1000, @poll);
my $count = IO::Poll::_poll($timeout * 1000, @poll);
if (!$count) {
foreach my $fd ( keys %DescriptorMap ) {
my Danga::Socket $sock = $DescriptorMap{$fd};
@ -481,6 +535,7 @@ sub close {
}
}
delete $PLCMap{$fd};
delete $DescriptorMap{$fd};
delete $PushBackSet{$fd};

View File

@ -135,10 +135,7 @@ sub _process_line {
if ($self->{mode} eq 'connect') {
$self->{mode} = 'cmd';
my $rc = $self->start_conversation;
if ($rc != DONE) {
$self->close;
return;
}
return;
}
elsif ($self->{mode} eq 'cmd') {
$line =~ s/\r?\n//;

View File

@ -102,10 +102,12 @@ sub connect_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DENY) {
$self->respond(550, ($msg || 'Connection from you denied, bye bye.'));
$self->disconnect;
return $rc;
}
elsif ($rc == DENYSOFT) {
$self->respond(450, ($msg || 'Connection from you temporarily denied, bye bye.'));
$self->disconnect;
return $rc;
}
elsif ($rc == DONE) {

View File

@ -44,9 +44,6 @@ and terminating the SMTP connection.
=cut
use warnings;
use strict;
sub register {
my ($self, $qp, @args) = @_;
@ -61,29 +58,49 @@ sub register {
@args,
};
$self->register_hook('connect', 'connect_handler');
$self->register_hook('connect', 'connect_post_handler');
$self->register_hook('mail', 'mail_handler')
if $self->{_args}->{'defer-reject'};
warn("check_earlytalker registered\n");
1;
}
sub connect_handler {
my ($self, $transaction) = @_;
if ($self->qp->can_read($self->{_args}->{'wait'})) {
$self->log(LOGNOTICE, 'remote host started talking before we said hello');
if ($self->{_args}->{'defer-reject'}) {
$self->connection->notes('earlytalker', 1);
}
else {
my $msg = 'Connecting host started transmitting before SMTP greeting';
return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft';
warn("check early talker");
my $qp = $self->qp;
my $conn = $qp->connection;
$qp->AddTimer($self->{_args}{'wait'}, sub { read_now($qp, $conn) });
$qp->disable_read();
return CONTINUATION;
}
sub read_now {
my ($qp, $conn) = @_;
warn("read now");
$qp->enable_read();
if (my $data = $qp->read(1024)) {
if (length($$data)) {
$qp->log(LOGNOTICE, 'remote host started talking before we said hello');
$qp->push_back_read($data);
$conn->notes('earlytalker', 1);
}
}
else {
$self->log(LOGINFO, 'remote host said nothing spontaneous, proceeding');
}
return DECLINED;
$qp->finish_continuation;
}
sub connect_post_handler {
my ($self, $transaction) = @_;
my $conn = $self->qp->connection;
return DECLINED unless $conn->notes('earlytalker');
return DECLINED if $self->{'defer-reject'};
my $msg = 'Connecting host started transmitting before SMTP greeting';
return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft';
return DECLINED; # assume action eq 'log'
}
sub mail_handler {
@ -91,6 +108,7 @@ sub mail_handler {
my $msg = 'Connecting host started transmitting before SMTP greeting';
return DECLINED unless $self->connection->notes('earlytalker');
my $msg = 'Connecting host started transmitting before SMTP greeting';
return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft';
return DECLINED;