diff --git a/lib/Danga/Client.pm b/lib/Danga/Client.pm index f85ef99..2c37dc4 100644 --- a/lib/Danga/Client.pm +++ b/lib/Danga/Client.pm @@ -33,7 +33,7 @@ sub get_line { #warn("get_line PRE\n"); $self->EventLoop(); #warn("get_line POST\n"); - $self->watch_read(0); + $self->disable_read(); } return if $self->{closing}; # now have a line. @@ -49,8 +49,7 @@ sub can_read { # warn("Calling can-read\n"); $self->{can_read_mode} = 1; if (!length($self->{line})) { - my $old = $self->watch_read(); - $self->watch_read(1); + $self->disable_read(); # loop because any callback, not just ours, can make EventLoop return while( !(length($self->{line}) || (Time::HiRes::time > $end)) ) { $self->SetPostLoopCallback(sub { (length($self->{line}) || @@ -58,8 +57,8 @@ sub can_read { #warn("get_line PRE\n"); $self->EventLoop(); #warn("get_line POST\n"); - } - $self->watch_read($old); + } + $self->enable_read(); } $self->{can_read_mode} = 0; $self->SetPostLoopCallback(undef); diff --git a/lib/Danga/DNS.pm b/lib/Danga/DNS.pm index f05f7de..dc8128a 100644 --- a/lib/Danga/DNS.pm +++ b/lib/Danga/DNS.pm @@ -39,25 +39,25 @@ sub new { if ($options{type}) { if ($options{type} eq 'TXT') { if (!$resolver->query_txt($self, @{$self->{hosts}})) { - $client->watch_read(1) if $client; + $client->enable_read() if $client; return; } } elsif ($options{type} eq 'A') { if (!$resolver->query($self, @{$self->{hosts}})) { - $client->watch_read(1) if $client; + $client->enable_read() if $client; return; } } elsif ($options{type} eq 'PTR') { if (!$resolver->query($self, @{$self->{hosts}})) { - $client->watch_read(1) if $client; + $client->enable_read() if $client; return; } } elsif ($options{type} eq 'MX') { if (!$resolver->query_mx($self, @{$self->{hosts}})) { - $client->watch_read(1) if $client; + $client->enable_read() if $client; return; } } @@ -67,7 +67,7 @@ sub new { } else { if (!$resolver->query($self, @{$self->{hosts}})) { - $client->watch_read(1) if $client; + $client->enable_read() if $client; return; } } diff --git a/lib/Danga/Socket.pm b/lib/Danga/Socket.pm index 331f357..ef7b722 100644 --- a/lib/Danga/Socket.pm +++ b/lib/Danga/Socket.pm @@ -74,6 +74,7 @@ our ( # descriptors for the event loop to track. $PostLoopCallback, # subref to call at the end of each loop, if defined %PLCMap, # fd (num) -> PostLoopCallback + @Timers, # timers ); %OtherFds = (); @@ -110,6 +111,30 @@ sub OtherFds { return wantarray ? %OtherFds : \%OtherFds; } +sub AddTimer { + my $class = shift; + my ($secs, $coderef) = @_; + my $timeout = time + $secs; + + use Data::Dumper; $Data::Dumper::Indent=1; + + if (!@Timers || ($timeout > $Timers[-1][0])) { + push @Timers, [$timeout, $coderef]; + print STDERR Dumper(\@Timers); + return; + } + + # Now where do we insert... + for (my $i = 0; $i < @Timers; $i++) { + if ($Timers[$i][0] > $timeout) { + splice(@Timers, $i, 0, [$timeout, $coderef]); + print STDERR Dumper(\@Timers); + return; + } + } + + die "Shouldn't get here spank matt."; +} ### (CLASS) METHOD: DescriptorMap() ### Get the hash of Danga::Socket objects keyed by the file descriptor they are @@ -169,7 +194,16 @@ sub KQueueEventLoop { } while (1) { - my @ret = $KQueue->kevent(1000); + my $now = time; + # Run expired timers + while (@Timers && $Timers[0][0] <= $now) { + my $to_run = shift(@Timers); + $to_run->[1]->($now); + } + + # Get next timeout + my $timeout = @Timers ? ($Timers[0][0] - $now) : 1; + my @ret = $KQueue->kevent($timeout * 1000); if (!@ret) { foreach my $fd ( keys %DescriptorMap ) { @@ -233,11 +267,21 @@ sub EpollEventLoop { } while (1) { + my $now = time; + # Run expired timers + while (@Timers && $Timers[0][0] <= $now) { + my $to_run = shift(@Timers); + $to_run->[1]->($now); + } + + # Get next timeout + my $timeout = @Timers ? ($Timers[0][0] - $now) : 1; + my @events; my $i; my $evcount; # get up to 1000 events, 1000ms timeout - while ($evcount = epoll_wait($Epoll, 1000, 1000, \@events)) { + while ($evcount = epoll_wait($Epoll, 1000, $timeout * 1000, \@events)) { my @objs; EVENT: for ($i=0; $i<$evcount; $i++) { @@ -300,6 +344,16 @@ sub PollEventLoop { my Danga::Socket $pob; while (1) { + my $now = time; + # Run expired timers + while (@Timers && $Timers[0][0] <= $now) { + my $to_run = shift(@Timers); + $to_run->[1]->($now); + } + + # Get next timeout + my $timeout = @Timers ? ($Timers[0][0] - $now) : 1; + # the following sets up @poll as a series of ($poll,$event_mask) # items, then uses IO::Poll::_poll, implemented in XS, which # modifies the array in place with the even elements being @@ -314,7 +368,7 @@ sub PollEventLoop { } return 0 unless @poll; - my $count = IO::Poll::_poll(1000, @poll); + my $count = IO::Poll::_poll($timeout * 1000, @poll); if (!$count) { foreach my $fd ( keys %DescriptorMap ) { my Danga::Socket $sock = $DescriptorMap{$fd}; @@ -481,6 +535,7 @@ sub close { } } + delete $PLCMap{$fd}; delete $DescriptorMap{$fd}; delete $PushBackSet{$fd}; diff --git a/lib/Qpsmtpd/PollServer.pm b/lib/Qpsmtpd/PollServer.pm index e793df5..0ee0eda 100644 --- a/lib/Qpsmtpd/PollServer.pm +++ b/lib/Qpsmtpd/PollServer.pm @@ -135,10 +135,7 @@ sub _process_line { if ($self->{mode} eq 'connect') { $self->{mode} = 'cmd'; my $rc = $self->start_conversation; - if ($rc != DONE) { - $self->close; - return; - } + return; } elsif ($self->{mode} eq 'cmd') { $line =~ s/\r?\n//; diff --git a/lib/Qpsmtpd/SMTP.pm b/lib/Qpsmtpd/SMTP.pm index 154d87f..7c4249e 100644 --- a/lib/Qpsmtpd/SMTP.pm +++ b/lib/Qpsmtpd/SMTP.pm @@ -102,10 +102,12 @@ sub connect_respond { my ($self, $rc, $msg) = @_; if ($rc == DENY) { $self->respond(550, ($msg || 'Connection from you denied, bye bye.')); + $self->disconnect; return $rc; } elsif ($rc == DENYSOFT) { $self->respond(450, ($msg || 'Connection from you temporarily denied, bye bye.')); + $self->disconnect; return $rc; } elsif ($rc == DONE) { diff --git a/plugins/check_earlytalker b/plugins/check_earlytalker index 7256e88..1ead3d4 100644 --- a/plugins/check_earlytalker +++ b/plugins/check_earlytalker @@ -44,9 +44,6 @@ and terminating the SMTP connection. =cut -use warnings; -use strict; - sub register { my ($self, $qp, @args) = @_; @@ -61,29 +58,49 @@ sub register { @args, }; $self->register_hook('connect', 'connect_handler'); + $self->register_hook('connect', 'connect_post_handler'); $self->register_hook('mail', 'mail_handler') if $self->{_args}->{'defer-reject'}; + warn("check_earlytalker registered\n"); 1; } sub connect_handler { my ($self, $transaction) = @_; - if ($self->qp->can_read($self->{_args}->{'wait'})) { - $self->log(LOGNOTICE, 'remote host started talking before we said hello'); - if ($self->{_args}->{'defer-reject'}) { - $self->connection->notes('earlytalker', 1); - } - else { - my $msg = 'Connecting host started transmitting before SMTP greeting'; - return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; - return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; + warn("check early talker"); + my $qp = $self->qp; + my $conn = $qp->connection; + $qp->AddTimer($self->{_args}{'wait'}, sub { read_now($qp, $conn) }); + $qp->disable_read(); + return CONTINUATION; +} + +sub read_now { + my ($qp, $conn) = @_; + + warn("read now"); + $qp->enable_read(); + if (my $data = $qp->read(1024)) { + if (length($$data)) { + $qp->log(LOGNOTICE, 'remote host started talking before we said hello'); + $qp->push_back_read($data); + $conn->notes('earlytalker', 1); } } - else { - $self->log(LOGINFO, 'remote host said nothing spontaneous, proceeding'); - } - return DECLINED; + $qp->finish_continuation; +} + +sub connect_post_handler { + my ($self, $transaction) = @_; + + my $conn = $self->qp->connection; + return DECLINED unless $conn->notes('earlytalker'); + return DECLINED if $self->{'defer-reject'}; + my $msg = 'Connecting host started transmitting before SMTP greeting'; + return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; + return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; + return DECLINED; # assume action eq 'log' } sub mail_handler { @@ -91,6 +108,7 @@ sub mail_handler { my $msg = 'Connecting host started transmitting before SMTP greeting'; return DECLINED unless $self->connection->notes('earlytalker'); + my $msg = 'Connecting host started transmitting before SMTP greeting'; return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; return DECLINED;