From a4517bdfa436b128578d3e8e2f662ef3b57168ad Mon Sep 17 00:00:00 2001 From: Matt Sergeant Date: Sat, 18 Jun 2005 18:22:16 +0000 Subject: [PATCH] Continuation support git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@436 958fd67b-6ff1-0310-b445-bb7760255be9 --- lib/Qpsmtpd.pm | 97 +++++++++++++----- lib/Qpsmtpd/Constants.pm | 19 ++-- lib/Qpsmtpd/PollServer.pm | 64 ++++-------- lib/Qpsmtpd/SMTP.pm | 211 ++++++++++++++++++++++++-------------- plugins/dnsbl | 34 +++--- 5 files changed, 257 insertions(+), 168 deletions(-) diff --git a/lib/Qpsmtpd.pm b/lib/Qpsmtpd.pm index 6d07d20..4bd5389 100644 --- a/lib/Qpsmtpd.pm +++ b/lib/Qpsmtpd.pm @@ -208,38 +208,20 @@ sub _load_plugins { sub run_hooks { my ($self, $hook) = (shift, shift); + if ($self->{_continuation}) { + die "Continuations in progress from previous hook (this is the $hook hook)"; + } my $hooks = $self->{hooks}; if ($hooks->{$hook}) { my @r; - for my $code (@{$hooks->{$hook}}) { - $self->log(LOGINFO, "running plugin ($hook):", $code->{name}); - eval { (@r) = $code->{code}->($self, $self->transaction, @_); }; - $@ and $self->log(LOGCRIT, "FATAL PLUGIN ERROR: ", $@) and next; - - !defined $r[0] - and $self->log(LOGERROR, "plugin ".$code->{name} - ."running the $hook hook returned undef!") - and next; - - if ($self->transaction) { - my $tnotes = $self->transaction->notes( $code->{name} ); - $tnotes->{"hook_$hook"}->{'return'} = $r[0] - if (!defined $tnotes || ref $tnotes eq "HASH"); - } else { - my $cnotes = $self->connection->notes( $code->{name} ); - $cnotes->{"hook_$hook"}->{'return'} = $r[0] - if (!defined $cnotes || ref $cnotes eq "HASH"); + my @local_hooks = @{$hooks->{$hook}}; + while (@local_hooks) { + my $code = shift @local_hooks; + @r = $self->run_hook($hook, $code, @_); + next unless @r; + if ($r[0] == CONTINUATION) { + $self->{_continuation} = [$hook, [@_], @local_hooks]; } - - # should we have a hook for "OK" too? - if ($r[0] == DENY or $r[0] == DENYSOFT or - $r[0] == DENY_DISCONNECT or $r[0] == DENYSOFT_DISCONNECT) - { - $r[1] = "" if not defined $r[1]; - $self->log(LOGDEBUG, "Plugin $code->{name}, hook $hook returned $r[0], $r[1]"); - $self->run_hooks("deny", $code->{name}, $r[0], $r[1]) unless ($hook eq "deny"); - } - last unless $r[0] == DECLINED; } $r[0] = DECLINED if not defined $r[0]; @@ -248,6 +230,65 @@ sub run_hooks { return (0, ''); } +sub finish_continuation { + my ($self) = @_; + die "No continuation in progress" unless $self->{_continuation}; + my $todo = $self->{_continuation}; + $self->{_continuation} = undef; + my $hook = shift @$todo || die "No hook in the continuation"; + my $args = shift @$todo || die "No hook args in the continuation"; + my @r; + while (@$todo) { + my $code = shift @$todo; + @r = $self->run_hook($hook, $code, @$args); + if ($r[0] == CONTINUATION) { + $self->{_continuation} = [$hook, $args, @$todo]; + return @r; + } + last unless $r[0] == DECLINED; + } + $r[0] = DECLINED if not defined $r[0]; + my $responder = $hook . "_respond"; + if (my $meth = $self->can($responder)) { + return $meth->($self, @r, @$args); + } + die "No ${hook}_respond method"; +} + +sub run_hook { + my ($self, $hook, $code, @args) = @_; + my @r; + $self->log(LOGINFO, "running plugin ($hook):", $code->{name}); + eval { (@r) = $code->{code}->($self, $self->transaction, @args); }; + $@ and $self->log(LOGCRIT, "FATAL PLUGIN ERROR: ", $@) and return; + + !defined $r[0] + and $self->log(LOGERROR, "plugin ".$code->{name} + ."running the $hook hook returned undef!") + and return; + + if ($self->transaction) { + my $tnotes = $self->transaction->notes( $code->{name} ); + $tnotes->{"hook_$hook"}->{'return'} = $r[0] + if (!defined $tnotes || ref $tnotes eq "HASH"); + } else { + my $cnotes = $self->connection->notes( $code->{name} ); + $cnotes->{"hook_$hook"}->{'return'} = $r[0] + if (!defined $cnotes || ref $cnotes eq "HASH"); + } + + # should we have a hook for "OK" too? + if ($r[0] == DENY or $r[0] == DENYSOFT or + $r[0] == DENY_DISCONNECT or $r[0] == DENYSOFT_DISCONNECT) + { + $r[1] = "" if not defined $r[1]; + $self->log(LOGDEBUG, "Plugin $code->{name}, hook $hook returned $r[0], $r[1]"); + $self->run_hooks("deny", $code->{name}, $r[0], $r[1]) unless ($hook eq "deny"); + } + + return @r; +} + sub _register_hook { my $self = shift; my ($hook, $code, $unshift) = @_; diff --git a/lib/Qpsmtpd/Constants.pm b/lib/Qpsmtpd/Constants.pm index b1395eb..c67dcf4 100644 --- a/lib/Qpsmtpd/Constants.pm +++ b/lib/Qpsmtpd/Constants.pm @@ -3,7 +3,7 @@ use strict; require Exporter; my (@common) = qw(OK DECLINED DONE DENY DENYSOFT DENYHARD - DENY_DISCONNECT DENYSOFT_DISCONNECT + DENY_DISCONNECT DENYSOFT_DISCONNECT CONTINUATION ); my (@loglevels) = qw(LOGDEBUG LOGINFO LOGNOTICE LOGWARN LOGERROR LOGCRIT LOGALERT LOGEMERG LOGRADAR); @@ -11,14 +11,15 @@ use vars qw($VERSION @ISA @EXPORT); @ISA = qw(Exporter); @EXPORT = (@common, @loglevels); -use constant OK => 900; -use constant DENY => 901; # 550 -use constant DENYSOFT => 902; # 450 -use constant DENYHARD => 903; # 550 + disconnect (deprecated in 0.29) -use constant DENY_DISCONNECT => 903; # 550 + disconnect -use constant DENYSOFT_DISCONNECT => 904; # 450 + disconnect -use constant DECLINED => 909; -use constant DONE => 910; +use constant OK => 900; +use constant DENY => 901; # 550 +use constant DENYSOFT => 902; # 450 +use constant DENYHARD => 903; # 550 + disconnect (deprecated in 0.29) +use constant DENY_DISCONNECT => 903; # 550 + disconnect +use constant DENYSOFT_DISCONNECT => 904; # 450 + disconnect +use constant DECLINED => 909; +use constant DONE => 910; +use constant CONTINUATION => 911; # log levels diff --git a/lib/Qpsmtpd/PollServer.pm b/lib/Qpsmtpd/PollServer.pm index 61cc7fd..e793df5 100644 --- a/lib/Qpsmtpd/PollServer.pm +++ b/lib/Qpsmtpd/PollServer.pm @@ -21,6 +21,7 @@ use fields qw( _transaction _test_mode _extras + _continuation ); use Qpsmtpd::Constants; use Qpsmtpd::Auth; @@ -95,6 +96,13 @@ sub fault { return; } +sub log { + my ($self, $trace, @log) = @_; + my $fd = $self->{fd}; + $fd ||= '?'; + $self->SUPER::log($trace, "fd:$fd", @log); +} + sub process_line { my $self = shift; my $line = shift || return; @@ -164,17 +172,8 @@ sub process_cmd { else { # No such method - i.e. unrecognized command my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd); - if ($rc == DENY) { - $self->respond(521, $msg); - $self->disconnect; - return; - } - elsif ($rc == DONE) { - return; # TODO - this isn't right. - } - else { - return $self->respond(500, "Unrecognized command"); - } + return $self->unrecognized_command_respond unless $rc == CONTINUATION; + return 1; } } @@ -201,29 +200,20 @@ sub start_conversation { ); my ($rc, $msg) = $self->run_hooks("connect"); - if ($rc == DENY) { - $self->respond(550, ($msg || 'Connection from you denied, bye bye.')); - return $rc; - } - elsif ($rc == DENYSOFT) { - $self->respond(450, ($msg || 'Connection from you temporarily denied, bye bye.')); - return $rc; - } - elsif ($rc == DONE) { - $self->respond(220, $msg); - return $rc; - } - else { - $self->respond(220, $self->config('me') ." ESMTP qpsmtpd " - . $self->version ." ready; send us your mail, but not your spam."); - return DONE; - } + return $self->connect_respond($rc, $msg) unless $rc == CONTINUATION; + return DONE; } sub data { my $self = shift; my ($rc, $msg) = $self->run_hooks("data"); + return $self->data_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub data_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return; } @@ -350,22 +340,8 @@ sub end_of_data { return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size}; my ($rc, $msg) = $self->run_hooks("data_post"); - if ($rc == DONE) { - return; - } - elsif ($rc == DENY) { - $self->respond(552, $msg || "Message denied"); - } - elsif ($rc == DENYSOFT) { - $self->respond(452, $msg || "Message denied temporarily"); - } - else { - $self->queue($self->transaction); - } - - # DATA is always the end of a "transaction" - $self->reset_transaction; - return; + return $self->data_post_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; } 1; diff --git a/lib/Qpsmtpd/SMTP.pm b/lib/Qpsmtpd/SMTP.pm index bb463e5..154d87f 100644 --- a/lib/Qpsmtpd/SMTP.pm +++ b/lib/Qpsmtpd/SMTP.pm @@ -54,18 +54,9 @@ sub dispatch { # if $state{dnsbl_blocked} and ($cmd eq "rcpt"); if ($cmd !~ /^(\w{1,12})$/ or !exists $self->{_commands}->{$1}) { - my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd); - if ($rc == DENY) { - $self->respond(521, $msg); - $self->disconnect; - } - elsif ($rc == DONE) { - 1; - } - else { - $self->respond(500, "Unrecognized command"); - } - return 1 + my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd, @_); + return $self->unrecognized_command_respond($rc, $msg, @_) unless $rc == CONTINUATION; + return 1; } $cmd = $1; @@ -79,6 +70,17 @@ sub dispatch { return; } +sub unrecognized_command_respond { + my ($self, $rc, $msg) = @_; + if ($rc == DENY) { + $self->respond(521, $msg); + $self->disconnect; + } + elsif ($rc != DONE) { + $self->respond(500, "Unrecognized command"); + } +} + sub fault { my $self = shift; my ($msg) = shift || "program fault - command not performed"; @@ -92,6 +94,12 @@ sub start_conversation { # this should maybe be called something else than "connect", see # lib/Qpsmtpd/TcpServer.pm for more confusion. my ($rc, $msg) = $self->run_hooks("connect"); + return $self->connect_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub connect_respond { + my ($self, $rc, $msg) = @_; if ($rc == DENY) { $self->respond(550, ($msg || 'Connection from you denied, bye bye.')); return $rc; @@ -118,17 +126,25 @@ sub helo { return $self->respond (503, "but you already said HELO ...") if $conn->hello; my ($rc, $msg) = $self->run_hooks("helo", $hello_host, @stuff); - if ($rc == DONE) { - # do nothing - } elsif ($rc == DENY) { + return $self->helo_respond($rc, $msg, $hello_host, @stuff) unless $rc == CONTINUATION; + return 1; +} + +sub helo_respond { + my ($self, $rc, $msg, $hello_host) = @_; + if ($rc == DENY) { $self->respond(550, $msg); - } elsif ($rc == DENYSOFT) { + } + elsif ($rc == DENYSOFT) { $self->respond(450, $msg); - } else { + } + elsif ($rc != DONE) { + my $conn = $self->connection; $conn->hello("helo"); $conn->hello_host($hello_host); $self->transaction; - $self->respond(250, $self->config('me') ." Hi " . $conn->remote_info . " [" . $conn->remote_ip ."]; I am so happy to meet you."); + $self->respond(250, $self->config('me') ." Hi " . $conn->remote_info . + " [" . $conn->remote_ip ."]; I am so happy to meet you."); } } @@ -140,13 +156,20 @@ sub ehlo { return $self->respond (503, "but you already said HELO ...") if $conn->hello; my ($rc, $msg) = $self->run_hooks("ehlo", $hello_host, @stuff); - if ($rc == DONE) { - # do nothing - } elsif ($rc == DENY) { + return $self->ehlo_respond($rc, $msg, $hello_host, @stuff) unless $rc == CONTINUATION; + return 1; +} + +sub ehlo_respond { + my ($self, $rc, $msg, $hello_host) = @_; + if ($rc == DENY) { $self->respond(550, $msg); - } elsif ($rc == DENYSOFT) { + } + elsif ($rc == DENYSOFT) { $self->respond(450, $msg); - } else { + } + elsif ($rc != DONE) { + my $conn = $self->connection; $conn->hello("ehlo"); $conn->hello_host($hello_host); $self->transaction; @@ -211,57 +234,62 @@ sub mail { unless ($self->connection->hello) { return $self->respond(503, "please say hello first ..."); } + + my $from_parameter = join " ", @_; + $self->log(LOGINFO, "full from_parameter: $from_parameter"); + + my ($from) = ($from_parameter =~ m/^from:\s*(<[^>]*>)/i)[0]; + + # support addresses without <> ... maybe we shouldn't? + ($from) = "<" . ($from_parameter =~ m/^from:\s*(\S+)/i)[0] . ">" + unless $from; + + $self->log(LOGWARN, "from email address : [$from]"); + + if ($from eq "<>" or $from =~ m/\[undefined\]/ or $from eq "<#@[]>") { + $from = Qpsmtpd::Address->new("<>"); + } else { - my $from_parameter = join " ", @_; - $self->log(LOGINFO, "full from_parameter: $from_parameter"); + $from = (Qpsmtpd::Address->parse($from))[0]; + } + return $self->respond(501, "could not parse your mail from command") unless $from; - my ($from) = ($from_parameter =~ m/^from:\s*(<[^>]*>)/i)[0]; + my ($rc, $msg) = $self->run_hooks("mail", $from); + return $self->mail_respond($rc, $msg, $from) unless $rc == CONTINUATION; + return 1; +} - # support addresses without <> ... maybe we shouldn't? - ($from) = "<" . ($from_parameter =~ m/^from:\s*(\S+)/i)[0] . ">" - unless $from; - - $self->log(LOGWARN, "from email address : [$from]"); - - if ($from eq "<>" or $from =~ m/\[undefined\]/ or $from eq "<#@[]>") { - $from = Qpsmtpd::Address->new("<>"); - } - else { - $from = (Qpsmtpd::Address->parse($from))[0]; - } - return $self->respond(501, "could not parse your mail from command") unless $from; - - my ($rc, $msg) = $self->run_hooks("mail", $from); - if ($rc == DONE) { - return 1; - } - elsif ($rc == DENY) { - $msg ||= $from->format . ', denied'; - $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); - $self->respond(550, $msg); - } - elsif ($rc == DENYSOFT) { - $msg ||= $from->format . ', temporarily denied'; - $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); - $self->respond(450, $msg); - } - elsif ($rc == DENY_DISCONNECT) { - $msg ||= $from->format . ', denied'; - $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); - $self->respond(550, $msg); - $self->disconnect; - } - elsif ($rc == DENYSOFT_DISCONNECT) { - $msg ||= $from->format . ', temporarily denied'; - $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); - $self->respond(450, $msg); - $self->disconnect; - } - else { # includes OK - $self->log(LOGINFO, "getting mail from ".$from->format); - $self->respond(250, $from->format . ", sender OK - how exciting to get mail from you!"); - $self->transaction->sender($from); - } +sub mail_respond { + my ($self, $rc, $msg, $from) = @_; + if ($rc == DONE) { + return 1; + } + elsif ($rc == DENY) { + $msg ||= $from->format . ', denied'; + $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); + $self->respond(550, $msg); + } + elsif ($rc == DENYSOFT) { + $msg ||= $from->format . ', temporarily denied'; + $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); + $self->respond(450, $msg); + } + elsif ($rc == DENY_DISCONNECT) { + $msg ||= $from->format . ', denied'; + $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); + $self->respond(550, $msg); + $self->disconnect; + } + elsif ($rc == DENYSOFT_DISCONNECT) { + $msg ||= $from->format . ', temporarily denied'; + $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); + $self->respond(450, $msg); + $self->disconnect; + } + else { # includes OK + $self->log(LOGINFO, "getting mail from ".$from->format); + $self->respond(250, $from->format . ", sender OK - how exciting to get mail from you!"); + $self->transaction->sender($from); } } @@ -278,6 +306,12 @@ sub rcpt { return $self->respond(501, "could not parse recipient") unless $rcpt; my ($rc, $msg) = $self->run_hooks("rcpt", $rcpt); + return $self->rcpt_respond($rc, $msg, $rcpt) unless $rc == CONTINUATION; + return 1; +} + +sub rcpt_respond { + my ($self, $rc, $msg, $rcpt) = @_; if ($rc == DONE) { return 1; } @@ -312,7 +346,6 @@ sub rcpt { } - sub help { my $self = shift; $self->respond(214, @@ -334,6 +367,12 @@ sub vrfy { # I also don't think it provides all the proper result codes. my ($rc, $msg) = $self->run_hooks("vrfy"); + return $self->vrfy_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub vrfy_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -361,6 +400,12 @@ sub rset { sub quit { my $self = shift; my ($rc, $msg) = $self->run_hooks("quit"); + return $self->quit_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub quit_respond { + my ($self, $rc, $msg) = @_; if ($rc != DONE) { $self->respond(221, $self->config('me') . " closing connection. Have a wonderful day."); } @@ -373,9 +418,17 @@ sub disconnect { $self->reset_transaction; } +sub disconnect_respond { } + sub data { my $self = shift; my ($rc, $msg) = $self->run_hooks("data"); + return $self->data_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub data_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -493,6 +546,11 @@ sub data { $self->respond(552, "Message too big!"),return 1 if $max_size and $size > $max_size; ($rc, $msg) = $self->run_hooks("data_post"); + return $self->data_post_respond($rc, $msg) unless $rc == CONTINUATION; +} + +sub data_post_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -508,7 +566,6 @@ sub data { # DATA is always the end of a "transaction" return $self->reset_transaction; - } sub getline { @@ -524,6 +581,12 @@ sub queue { my ($self, $transaction) = @_; my ($rc, $msg) = $self->run_hooks("queue"); + return $self->queue_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub queue_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -539,8 +602,6 @@ sub queue { else { $self->respond(451, $msg || "Queuing declined or disabled; try again later" ); } - - } diff --git a/plugins/dnsbl b/plugins/dnsbl index 0a708ea..ca2c5d5 100644 --- a/plugins/dnsbl +++ b/plugins/dnsbl @@ -5,7 +5,7 @@ use Danga::DNS; sub register { my ($self) = @_; $self->register_hook("connect", "connect_handler"); - $self->register_hook("rcpt", "rcpt_handler"); + $self->register_hook("connect", "pickup_handler"); } sub connect_handler { @@ -34,12 +34,14 @@ sub connect_handler { my $reversed_ip = join(".", reverse(split(/\./, $remote_ip))); + $self->transaction->notes('pending_dns_queries', scalar(keys(%dnsbl_zones))); + my $qp = $self->qp; for my $dnsbl (keys %dnsbl_zones) { # fix to find A records, if the dnsbl_zones line has a second field 20/1/04 ++msp if (defined($dnsbl_zones{$dnsbl})) { $self->log(LOGDEBUG, "Checking $reversed_ip.$dnsbl for A record in the background"); Danga::DNS->new( - callback => sub { $self->process_a_result($dnsbl_zones{$dnsbl}, @_) }, + callback => sub { process_a_result($qp, $dnsbl_zones{$dnsbl}, @_) }, host => "$reversed_ip.$dnsbl", type => 'A', client => $self->qp->input_sock, @@ -47,7 +49,7 @@ sub connect_handler { } else { $self->log(LOGDEBUG, "Checking $reversed_ip.$dnsbl for TXT record in the background"); Danga::DNS->new( - callback => sub { $self->process_txt_result(@_) }, + callback => sub { process_txt_result($qp, @_) }, host => "$reversed_ip.$dnsbl", type => 'TXT', client => $self->qp->input_sock, @@ -55,40 +57,48 @@ sub connect_handler { } } - return DECLINED; + return CONTINUATION; } sub process_a_result { - my $self = shift; - my ($template, $result, $query) = @_; + my ($qp, $template, $result, $query) = @_; + + my $pending = $qp->transaction->notes('pending_dns_queries'); + $qp->transaction->notes('pending_dns_queries', --$pending); warn("Result for A $query: $result\n"); if ($result !~ /^\d+\.\d+\.\d+\.\d+$/) { # NXDOMAIN or ERROR possibly... + $qp->finish_continuation unless $pending; return; } - my $ip = $self->connection->remote_ip; + my $conn = $qp->connection; + my $ip = $conn->remote_ip; $template =~ s/%IP%/$ip/g; - my $conn = $self->connection; $conn->notes('dnsbl', $template) unless $conn->notes('dnsbl'); + $qp->finish_continuation unless $pending; } sub process_txt_result { - my $self = shift; - my ($result, $query) = @_; + my ($qp, $result, $query) = @_; + + my $pending = $qp->transaction->notes('pending_dns_queries'); + $qp->transaction->notes('pending_dns_queries', --$pending); warn("Result for TXT $query: $result\n"); if ($result !~ /[a-z]/) { # NXDOMAIN or ERROR probably... + $qp->finish_continuation unless $pending; return; } - my $conn = $self->connection; + my $conn = $qp->connection; $conn->notes('dnsbl', $result) unless $conn->notes('dnsbl'); + $qp->finish_continuation unless $pending; } -sub rcpt_handler { +sub pickup_handler { my ($self, $transaction, $rcpt) = @_; # RBLSMTPD being non-empty means it contains the failure message to return