From 2b709d664c367886babfe933e80766bd235bab83 Mon Sep 17 00:00:00 2001 From: Matt Sergeant Date: Fri, 8 Dec 2006 19:46:18 +0000 Subject: [PATCH] Async qpsmtpd (still entirely compatible with non-async version) git-svn-id: https://svn.perl.org/qpsmtpd/branches/0.3x@679 958fd67b-6ff1-0310-b445-bb7760255be9 --- lib/Qpsmtpd.pm | 4 +- lib/Qpsmtpd/PollServer.pm | 392 ++++++++++++++++++++++++++++++++++++++ lib/Qpsmtpd/SMTP.pm | 289 +++++++++++++++++----------- qpsmtpd-async | 311 ++++++++++++++++++++++++++++++ 4 files changed, 887 insertions(+), 109 deletions(-) create mode 100644 lib/Qpsmtpd/PollServer.pm create mode 100755 qpsmtpd-async diff --git a/lib/Qpsmtpd.pm b/lib/Qpsmtpd.pm index 2574986..2338042 100644 --- a/lib/Qpsmtpd.pm +++ b/lib/Qpsmtpd.pm @@ -348,7 +348,7 @@ sub run_hooks { $self->{_continuation} = [$hook, [@_], @local_hooks]; return $self->run_continuation(); } - return (0, ''); + return $self->hook_responder($hook, [0, ''], [@_]); } sub run_continuation { @@ -423,7 +423,7 @@ sub hook_responder { my $responder = $hook . '_respond'; if (my $meth = $self->can($responder)) { - return $meth->($self, $code, $msg, @$args); + return $meth->($self, $code, $msg, $args); } return $code, @$msg; } diff --git a/lib/Qpsmtpd/PollServer.pm b/lib/Qpsmtpd/PollServer.pm new file mode 100644 index 0000000..f2de0dc --- /dev/null +++ b/lib/Qpsmtpd/PollServer.pm @@ -0,0 +1,392 @@ +# $Id: Server.pm,v 1.10 2005/02/14 22:04:48 msergeant Exp $ + +package Qpsmtpd::PollServer; + +use base ('Danga::Client', 'Qpsmtpd::SMTP'); +# use fields required to be a subclass of Danga::Client. Have to include +# all fields used by Qpsmtpd.pm here too. +use fields qw( + input_sock + mode + header_lines + in_header + data_size + max_size + hooks + start_time + cmd_timeout + _auth_mechanism + _auth_state + _auth_ticket + _auth_user + _commands + _config_cache + _connection + _transaction + _test_mode + _extras + _continuation +); +use Qpsmtpd::Constants; +use Qpsmtpd::Address; +use ParaDNS; +use Mail::Header; +use POSIX qw(strftime); +use Socket qw(inet_aton AF_INET CRLF); +use Time::HiRes qw(time); +use strict; + +sub max_idle_time { 60 } +sub max_connect_time { 1200 } + +sub input_sock { + my $self = shift; + @_ and $self->{input_sock} = shift; + $self->{input_sock} || $self; +} + +sub new { + my Qpsmtpd::PollServer $self = shift; + + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + $self->{cmd_timeout} = 5; + $self->{start_time} = time; + $self->{mode} = 'connect'; + $self->load_plugins; + $self->load_logging; + return $self; +} + +sub uptime { + my Qpsmtpd::PollServer $self = shift; + + return (time() - $self->{start_time}); +} + +sub reset_for_next_message { + my Qpsmtpd::PollServer $self = shift; + $self->SUPER::reset_for_next_message(@_); + + $self->{_commands} = { + ehlo => 1, + helo => 1, + rset => 1, + mail => 1, + rcpt => 1, + data => 1, + help => 1, + vrfy => 1, + noop => 1, + quit => 1, + auth => 0, # disabled by default + }; + $self->{mode} = 'cmd'; + $self->{_extras} = {}; +} + +sub respond { + my Qpsmtpd::PollServer $self = shift; + my ($code, @messages) = @_; + while (my $msg = shift @messages) { + my $line = $code . (@messages ? "-" : " ") . $msg; + $self->write("$line\r\n"); + } + return 1; +} + +sub fault { + my Qpsmtpd::PollServer $self = shift; + $self->SUPER::fault(@_); + return; +} + +sub process_line { + my Qpsmtpd::PollServer $self = shift; + my $line = shift || return; + if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } + eval { $self->_process_line($line) }; + if ($@) { + print STDERR "Error: $@\n"; + return $self->fault("command failed unexpectedly") if $self->{mode} eq 'cmd'; + return $self->fault("error processing data lines") if $self->{mode} eq 'data'; + return $self->fault("unknown error"); + } + return; +} + +sub _process_line { + my Qpsmtpd::PollServer $self = shift; + my $line = shift; + + if ($self->{mode} eq 'connect') { + $self->{mode} = 'cmd'; + my $rc = $self->start_conversation; + return; + } + elsif ($self->{mode} eq 'cmd') { + $line =~ s/\r?\n//; + return $self->process_cmd($line); + } + elsif ($self->{mode} eq 'data') { + return $self->data_line($line); + } + else { + die "Unknown mode"; + } +} + +sub process_cmd { + my Qpsmtpd::PollServer $self = shift; + my $line = shift; + my ($cmd, @params) = split(/ +/, $line); + my $meth = lc($cmd); + if (my $lookup = $self->{_commands}->{$meth} && $self->can($meth)) { + my $resp = eval { + $lookup->($self, @params); + }; + if ($@) { + my $error = $@; + chomp($error); + $self->log(LOGERROR, "Command Error: $error"); + return $self->fault("command '$cmd' failed unexpectedly"); + } + return $resp; + } + else { + # No such method - i.e. unrecognized command + my ($rc, $msg) = $self->run_hooks("unrecognized_command", $meth, @params); + return 1; + } +} + +sub disconnect { + my Qpsmtpd::PollServer $self = shift; + $self->SUPER::disconnect(@_); + $self->close; +} + +sub start_conversation { + my Qpsmtpd::PollServer $self = shift; + + my $conn = $self->connection; + # set remote_host, remote_ip and remote_port + my ($ip, $port) = split(':', $self->peer_addr_string); + $conn->remote_ip($ip); + $conn->remote_port($port); + $conn->remote_info("[$ip]"); + ParaDNS->new( + finished => sub { $self->run_hooks("connect") }, + # NB: Setting remote_info to the same as remote_host + callback => sub { $conn->remote_info($conn->remote_host($_[0])) }, + host => $ip, + ); + + return; +} + +sub data { + my Qpsmtpd::PollServer $self = shift; + + my ($rc, $msg) = $self->run_hooks("data"); + return 1; +} + +sub data_respond { + my Qpsmtpd::PollServer $self = shift; + my ($rc, $msg) = @_; + if ($rc == DONE) { + return; + } + elsif ($rc == DENY) { + $self->respond(554, $msg || "Message denied"); + $self->reset_transaction(); + return; + } + elsif ($rc == DENYSOFT) { + $self->respond(451, $msg || "Message denied temporarily"); + $self->reset_transaction(); + return; + } + elsif ($rc == DENY_DISCONNECT) { + $self->respond(554, $msg || "Message denied"); + $self->disconnect; + return; + } + elsif ($rc == DENYSOFT_DISCONNECT) { + $self->respond(451, $msg || "Message denied temporarily"); + $self->disconnect; + return; + } + return $self->respond(503, "MAIL first") unless $self->transaction->sender; + return $self->respond(503, "RCPT first") unless $self->transaction->recipients; + + $self->{mode} = 'data'; + + $self->{header_lines} = ''; + $self->{data_size} = 0; + $self->{in_header} = 1; + $self->{max_size} = ($self->config('databytes'))[0] || 0; + + $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}"); + + $self->respond(354, "go ahead"); + + my $max_get = $self->{max_size} || 1048576; + $self->get_chunks($max_get, sub { $self->got_data($_[0]) }); + return 1; +} + +sub got_data { + my Qpsmtpd::PollServer $self = shift; + my $data = shift; + + my $done = 0; + my $remainder; + if ($data =~ s/^\.\r\n(.*)\z//m) { + $remainder = $1; + $done = 1; + } + + # add a transaction->blocked check back here when we have line by line plugin access... + unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) { + $data =~ s/\r\n/\n/mg; + $data =~ s/^\.\./\./mg; + + if ($self->{in_header} and $data =~ s/\A(.*?)\n[ \t]*\n//ms) { + $self->{header_lines} .= $1; + # end of headers + $self->{in_header} = 0; + + # ... need to check that we don't reformat any of the received lines. + # + # 3.8.2 Received Lines in Gatewaying + # When forwarding a message into or out of the Internet environment, a + # gateway MUST prepend a Received: line, but it MUST NOT alter in any + # way a Received: line that is already in the header. + my @header_lines = split(/\n/, $self->{header_lines}); + + my $header = Mail::Header->new(\@header_lines, + Modify => 0, MailFrom => "COERCE"); + $self->transaction->header($header); + $self->{header_lines} = ''; + + #$header->add("X-SMTPD", "qpsmtpd/".$self->version.", http://smtpd.develooper.com/"); + + # FIXME - call plugins to work on just the header here; can + # save us buffering the mail content. + } + + if ($self->{in_header}) { + $self->{header_lines} .= $data; + } + else { + $self->transaction->body_write(\$data); + } + + $self->{data_size} += length $data; + } + + + if ($done) { + $self->{mode} = 'cmd'; + $self->end_of_data; + $self->end_get_chunks($remainder); + } + +} + +sub data_line { + my Qpsmtpd::PollServer $self = shift; + + print "YIKES\n"; + + my $line = shift; + + if ($line eq ".\r\n") { + # add received etc. + $self->{mode} = 'cmd'; + return $self->end_of_data; + } + + # Reject messages that have either bare LF or CR. rjkaes noticed a + # lot of spam that is malformed in the header. + if ($line eq ".\n" or $line eq ".\r") { + $self->respond(421, "See http://smtpd.develooper.com/barelf.html"); + $self->disconnect; + return; + } + + # add a transaction->blocked check back here when we have line by line plugin access... + unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) { + $line =~ s/\r\n$/\n/; + $line =~ s/^\.\./\./; + + if ($self->{in_header} and $line =~ m/^\s*$/) { + # end of headers + $self->{in_header} = 0; + + # ... need to check that we don't reformat any of the received lines. + # + # 3.8.2 Received Lines in Gatewaying + # When forwarding a message into or out of the Internet environment, a + # gateway MUST prepend a Received: line, but it MUST NOT alter in any + # way a Received: line that is already in the header. + + my $header = Mail::Header->new($self->{header_lines}, + Modify => 0, MailFrom => "COERCE"); + $self->transaction->header($header); + + #$header->add("X-SMTPD", "qpsmtpd/".$self->version.", http://smtpd.develooper.com/"); + + # FIXME - call plugins to work on just the header here; can + # save us buffering the mail content. + } + + if ($self->{in_header}) { + push @{ $self->{header_lines} }, $line; + } + else { + $self->transaction->body_write(\$line); + } + + $self->{data_size} += length $line; + } + + return; +} + +sub end_of_data { + my Qpsmtpd::PollServer $self = shift; + + #$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300); + + $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}"); + + my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP"; + + my $header = $self->transaction->header; + if (!$header) { + $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE"); + $self->transaction->header($header); + } + + # only true if client authenticated + if ( $self->authenticated == OK ) { + $header->add("X-Qpsmtpd-Auth","True"); + } + + $header->add("Received", "from ".$self->connection->remote_info + ." (HELO ".$self->connection->hello_host . ") (".$self->connection->remote_ip + . ")\n by ".$self->config('me')." (qpsmtpd/".$self->version + .") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', localtime)), + 0); + + return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size}; + + my ($rc, $msg) = $self->run_hooks("data_post"); + return 1; +} + +1; + diff --git a/lib/Qpsmtpd/SMTP.pm b/lib/Qpsmtpd/SMTP.pm index e26e569..b684cce 100644 --- a/lib/Qpsmtpd/SMTP.pm +++ b/lib/Qpsmtpd/SMTP.pm @@ -53,21 +53,7 @@ sub dispatch { $self->{_counter}++; if ($cmd !~ /^(\w{1,12})$/ or !exists $self->{_commands}->{$1}) { - my ($rc, @msg) = $self->run_hooks("unrecognized_command", $cmd, @_); - @msg = map { split /\n/ } @msg; - if ($rc == DENY_DISCONNECT) { - $self->respond(521, @msg); - $self->disconnect; - } - elsif ($rc == DENY) { - $self->respond(500, @msg); - } - elsif ($rc == DONE) { - 1; - } - else { - $self->respond(500, "Unrecognized command"); - } + $self->run_hooks("unrecognized_command", $cmd, @_); return 1 } $cmd = $1; @@ -82,6 +68,20 @@ sub dispatch { return; } +sub unrecognized_command_respond { + my ($self, $rc, $msg) = @_; + if ($rc == DENY_DISCONNECT) { + $self->respond(521, @$msg); + $self->disconnect; + } + elsif ($rc == DENY) { + $self->respond(500, @$msg); + } + elsif ($rc != DONE) { + $self->respond(500, "Unrecognized command"); + } +} + sub fault { my $self = shift; my ($msg) = shift || "program fault - command not performed"; @@ -94,19 +94,21 @@ sub start_conversation { my $self = shift; # this should maybe be called something else than "connect", see # lib/Qpsmtpd/TcpServer.pm for more confusion. - my ($rc, @msg) = $self->run_hooks("connect"); + $self->run_hooks("connect"); + return DONE; +} + +sub connect_respond { + my ($self, $rc, $msg) = @_; if ($rc == DENY) { - $msg[0] ||= 'Connection from you denied, bye bye.'; - $self->respond(550, @msg); - return $rc; + $msg->[0] ||= 'Connection from you denied, bye bye.'; + $self->respond(550, @$msg); + $self->disconnect; } elsif ($rc == DENYSOFT) { - $msg[0] ||= 'Connection from you temporarily denied, bye bye.'; - $self->respond(450, @msg); - return $rc; - } - elsif ($rc == DONE) { - return $rc; + $msg->[0] ||= 'Connection from you temporarily denied, bye bye.'; + $self->respond(450, @$msg); + $self->disconnect; } elsif ($rc != DONE) { my $greets = $self->config('smtpgreeting'); @@ -121,7 +123,6 @@ sub start_conversation { } $self->respond(220, $greets); - return DONE; } } @@ -154,20 +155,26 @@ sub helo { my $conn = $self->connection; return $self->respond (503, "but you already said HELO ...") if $conn->hello; - ($rc, @msg) = $self->run_hooks("helo", $hello_host, @stuff); + $self->run_hooks("helo", $hello_host, @stuff); +} + +sub helo_respond { + my ($self, $rc, $msg, $args) = @_; + my ($hello_host) = @$args; if ($rc == DONE) { # do nothing } elsif ($rc == DENY) { - $self->respond(550, @msg); + $self->respond(550, @$msg); } elsif ($rc == DENYSOFT) { - $self->respond(450, @msg); + $self->respond(450, @$msg); } elsif ($rc == DENY_DISCONNECT) { - $self->respond(550, @msg); + $self->respond(550, @$msg); $self->disconnect; } elsif ($rc == DENYSOFT_DISCONNECT) { - $self->respond(450, @msg); + $self->respond(450, @$msg); $self->disconnect; } else { + my $conn = $self->connection; $conn->hello("helo"); $conn->hello_host($hello_host); $self->transaction; @@ -184,20 +191,26 @@ sub ehlo { my $conn = $self->connection; return $self->respond (503, "but you already said HELO ...") if $conn->hello; - ($rc, @msg) = $self->run_hooks("ehlo", $hello_host, @stuff); + $self->run_hooks("ehlo", $hello_host, @stuff); +} + +sub ehlo_respond { + my ($self, $rc, $msg, $args) = @_; + my ($hello_host) = @$args; if ($rc == DONE) { # do nothing } elsif ($rc == DENY) { - $self->respond(550, @msg); + $self->respond(550, @$msg); } elsif ($rc == DENYSOFT) { - $self->respond(450, @msg); + $self->respond(450, @$msg); } elsif ($rc == DENY_DISCONNECT) { - $self->respond(550, @msg); + $self->respond(550, @$msg); $self->disconnect; } elsif ($rc == DENYSOFT_DISCONNECT) { - $self->respond(450, @msg); + $self->respond(450, @$msg); $self->disconnect; } else { + my $conn = $self->connection; $conn->hello("ehlo"); $conn->hello_host($hello_host); $self->transaction; @@ -238,8 +251,14 @@ HOOK: foreach my $hook ( keys %{$self->{hooks}} ) { sub auth { my ($self, $line) = @_; - my ($rc, $sub) = $self->run_hooks('auth_parse'); - my ($ok, $mechanism, @stuff) = Qpsmtpd::Command->parse('auth', $line, $sub); + $self->run_hooks('auth_parse', $line); +} + +sub auth_parse_respond { + my ($self, $rc, $msg, $args) = @_; + my ($line) = @$args; + + my ($ok, $mechanism, @stuff) = Qpsmtpd::Command->parse('auth', $line, $msg->[0]); return $self->respond(501, $mechanism || "Syntax error in command") unless ($ok == OK); @@ -293,8 +312,14 @@ sub mail { } else { $self->log(LOGINFO, "full from_parameter: $line"); - my ($rc, @msg) = $self->run_hooks("mail_parse"); - my ($ok, $from, @params) = Qpsmtpd::Command->parse('mail', $line, $msg[0]); + $self->run_hooks("mail_parse", $line); + } +} + +sub mail_parse_respond { + my ($self, $rc, $msg, $args) = @_; + my ($line) = @$args; + my ($ok, $from, @params) = Qpsmtpd::Command->parse('mail', $line, $msg->[0]); return $self->respond(501, $from || "Syntax error in command") unless ($ok == OK); my %param; @@ -307,9 +332,14 @@ sub mail { # return (OK, "<$from>"); # (...or anything else parseable by Qpsmtpd::Address ;-)) # see also comment in sub rcpt() - ($rc, @msg) = $self->run_hooks("mail_pre", $from); + $self->run_hooks("mail_pre", $from, \%param); +} + +sub mail_pre_respond { + my ($self, $rc, $msg, $args) = @_; + my ($from, $param) = @$args; if ($rc == OK) { - $from = shift @msg; + $from = shift @$msg; } $self->log(LOGALERT, "from email address : [$from]"); @@ -324,30 +354,35 @@ sub mail { } return $self->respond(501, "could not parse your mail from command") unless $from; - ($rc, @msg) = $self->run_hooks("mail", $from, %param); + $self->run_hooks("mail", $from, %$param); +} + +sub mail_respond { + my ($self, $rc, $msg, $args) = @_; + my ($from, $param) = @$args; if ($rc == DONE) { return 1; } elsif ($rc == DENY) { - $msg[0] ||= $from->format . ', denied'; - $self->log(LOGINFO, "deny mail from " . $from->format . " (@msg)"); - $self->respond(550, @msg); + $msg->[0] ||= $from->format . ', denied'; + $self->log(LOGINFO, "deny mail from " . $from->format . " (@$msg)"); + $self->respond(550, @$msg); } elsif ($rc == DENYSOFT) { - $msg[0] ||= $from->format . ', temporarily denied'; - $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@msg)"); - $self->respond(450, @msg); + $msg->[0] ||= $from->format . ', temporarily denied'; + $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@$msg)"); + $self->respond(450, @$msg); } elsif ($rc == DENY_DISCONNECT) { - $msg[0] ||= $from->format . ', denied'; - $self->log(LOGINFO, "deny mail from " . $from->format . " (@msg)"); - $self->respond(550, @msg); + $msg->[0] ||= $from->format . ', denied'; + $self->log(LOGINFO, "deny mail from " . $from->format . " (@$msg)"); + $self->respond(550, @$msg); $self->disconnect; } elsif ($rc == DENYSOFT_DISCONNECT) { - $msg[0] ||= $from->format . ', temporarily denied'; - $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@msg)"); - $self->respond(421, @msg); + $msg->[0] ||= $from->format . ', temporarily denied'; + $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@$msg)"); + $self->respond(421, @$msg); $self->disconnect; } else { # includes OK @@ -355,13 +390,17 @@ sub mail { $self->respond(250, $from->format . ", sender OK - how exciting to get mail from you!"); $self->transaction->sender($from); } - } } sub rcpt { my ($self, $line) = @_; - my ($rc, @msg) = $self->run_hooks("rcpt_parse"); - my ($ok, $rcpt, @param) = Qpsmtpd::Command->parse("rcpt", $line, $msg[0]); + $self->run_hooks("rcpt_parse", $line); +} + +sub rcpt_parse_respond { + my ($self, $rc, $msg, $args) = @_; + my ($line) = @$args; + my ($ok, $rcpt, @param) = Qpsmtpd::Command->parse("rcpt", $line, $msg->[0]); return $self->respond(501, $rcpt || "Syntax error in command") unless ($ok == OK); return $self->respond(503, "Use MAIL before RCPT") unless $self->transaction->sender; @@ -378,9 +417,14 @@ sub rcpt { # this means, a plugin can decide to (pre-)accept # addresses like or # by removing the trailing "."/" " from this example... - ($rc, @msg) = $self->run_hooks("rcpt_pre", $rcpt); + $self->run_hooks("rcpt_pre", $rcpt, \%param); +} + +sub rcpt_pre_respond { + my ($self, $rc, $msg, $args) = @_; + my ($rcpt, $param) = @$args; if ($rc == OK) { - $rcpt = shift @msg; + $rcpt = shift @$msg; } $self->log(LOGALERT, "to email address : [$rcpt]"); return $self->respond(501, "could not parse recipient") @@ -391,28 +435,33 @@ sub rcpt { return $self->respond(501, "could not parse recipient") if (!$rcpt or ($rcpt->format eq '<>')); - ($rc, @msg) = $self->run_hooks("rcpt", $rcpt, %param); + $self->run_hooks("rcpt", $rcpt, %$param); +} + +sub rcpt_respond { + my ($self, $rc, $msg, $args) = @_; + my ($rcpt, $param) = @$args; if ($rc == DONE) { return 1; } elsif ($rc == DENY) { - $msg[0] ||= 'relaying denied'; - $self->respond(550, @msg); + $msg->[0] ||= 'relaying denied'; + $self->respond(550, @$msg); } elsif ($rc == DENYSOFT) { - $msg[0] ||= 'relaying denied'; - return $self->respond(450, @msg); + $msg->[0] ||= 'relaying denied'; + return $self->respond(450, @$msg); } elsif ($rc == DENY_DISCONNECT) { - $msg[0] ||= 'delivery denied'; - $self->log(LOGINFO, "delivery denied (@msg)"); - $self->respond(550, @msg); + $msg->[0] ||= 'delivery denied'; + $self->log(LOGINFO, "delivery denied (@$msg)"); + $self->respond(550, @$msg); $self->disconnect; } elsif ($rc == DENYSOFT_DISCONNECT) { - $msg[0] ||= 'relaying denied'; - $self->log(LOGINFO, "delivery denied (@msg)"); - $self->respond(421, @msg); + $msg->[0] ||= 'relaying denied'; + $self->log(LOGINFO, "delivery denied (@$msg)"); + $self->respond(421, @$msg); $self->disconnect; } elsif ($rc == OK) { @@ -425,8 +474,6 @@ sub rcpt { return 0; } - - sub help { my $self = shift; $self->respond(214, @@ -448,19 +495,23 @@ sub vrfy { # documented in RFC2821#3.5.1 # I also don't think it provides all the proper result codes. - my ($rc, @msg) = $self->run_hooks("vrfy"); + $self->run_hooks("vrfy"); +} + +sub vrfy_respond { + my ($self, $rc, $msg, $args) = @_; if ($rc == DONE) { return 1; } elsif ($rc == DENY) { - $msg[0] ||= "Access Denied"; - $self->respond(554, @msg); + $msg->[0] ||= "Access Denied"; + $self->respond(554, @$msg); $self->reset_transaction(); return 1; } elsif ($rc == OK) { - $msg[0] ||= "User OK"; - $self->respond(250, @msg); + $msg->[0] ||= "User OK"; + $self->respond(250, @$msg); return 1; } else { # $rc == DECLINED or anything else @@ -477,10 +528,14 @@ sub rset { sub quit { my $self = shift; - my ($rc, @msg) = $self->run_hooks("quit"); + $self->run_hooks("quit"); +} + +sub quit_respond { + my ($self, $rc, $msg, $args) = @_; if ($rc != DONE) { - $msg[0] ||= $self->config('me') . " closing connection. Have a wonderful day."; - $self->respond(221, @msg); + $msg->[0] ||= $self->config('me') . " closing connection. Have a wonderful day."; + $self->respond(221, @$msg); } $self->disconnect(); } @@ -493,31 +548,35 @@ sub disconnect { sub data { my $self = shift; - my ($rc, @msg) = $self->run_hooks("data"); + $self->run_hooks("data"); +} + +sub data_respond { + my ($self, $rc, $msg, $args) = @_; if ($rc == DONE) { return 1; } elsif ($rc == DENY) { - $msg[0] ||= "Message denied"; - $self->respond(554, @msg); + $msg->[0] ||= "Message denied"; + $self->respond(554, @$msg); $self->reset_transaction(); return 1; } elsif ($rc == DENYSOFT) { - $msg[0] ||= "Message denied temporarily"; - $self->respond(451, @msg); + $msg->[0] ||= "Message denied temporarily"; + $self->respond(451, @$msg); $self->reset_transaction(); return 1; } elsif ($rc == DENY_DISCONNECT) { - $msg[0] ||= "Message denied"; - $self->respond(554, @msg); + $msg->[0] ||= "Message denied"; + $self->respond(554, @$msg); $self->disconnect; return 1; } elsif ($rc == DENYSOFT_DISCONNECT) { - $msg[0] ||= "Message denied temporarily"; - $self->respond(421, @msg); + $msg->[0] ||= "Message denied temporarily"; + $self->respond(421, @$msg); $self->disconnect; return 1; } @@ -624,17 +683,21 @@ sub data { return 1; } - ($rc, @msg) = $self->run_hooks("data_post"); + $self->run_hooks("data_post"); +} + +sub data_post_respond { + my ($self, $rc, $msg, $args) = @_; if ($rc == DONE) { return 1; } elsif ($rc == DENY) { - $msg[0] ||= "Message denied"; - $self->respond(552, @msg); + $msg->[0] ||= "Message denied"; + $self->respond(552, @$msg); } elsif ($rc == DENYSOFT) { - $msg[0] ||= "Message denied temporarily"; - $self->respond(452, @msg); + $msg->[0] ||= "Message denied temporarily"; + $self->respond(452, @$msg); } else { $self->queue($self->transaction); @@ -658,7 +721,11 @@ sub queue { my ($self, $transaction) = @_; # First fire any queue_pre hooks - my ($rc, @msg) = $self->run_hooks("queue_pre"); + $self->run_hooks("queue_pre"); +} + +sub queue_pre_respond { + my ($self, $rc, $msg, $args) = @_; if ($rc == DONE) { return 1; } @@ -668,30 +735,38 @@ sub queue { } # If we got this far, run the queue hooks - ($rc, @msg) = $self->run_hooks("queue"); + $self->run_hooks("queue"); +} + +sub queue_respond { + my ($self, $rc, $msg, $args) = @_; if ($rc == DONE) { return 1; } elsif ($rc == OK) { - $msg[0] ||= 'Queued'; - $self->respond(250, @msg); + $msg->[0] ||= 'Queued'; + $self->respond(250, @$msg); } elsif ($rc == DENY) { - $msg[0] ||= 'Message denied'; - $self->respond(552, @msg); + $msg->[0] ||= 'Message denied'; + $self->respond(552, @$msg); } elsif ($rc == DENYSOFT) { - $msg[0] ||= 'Message denied temporarily'; - $self->respond(452, @msg); + $msg->[0] ||= 'Message denied temporarily'; + $self->respond(452, @$msg); } else { - $msg[0] ||= 'Queuing declined or disabled; try again later'; - $self->respond(451, @msg); + $msg->[0] ||= 'Queuing declined or disabled; try again later'; + $self->respond(451, @$msg); } # And finally run any queue_post hooks - ($rc, @msg) = $self->run_hooks("queue_post"); - $self->log(LOGERROR, @msg) unless ($rc == OK or $rc == 0); + $self->run_hooks("queue_post"); +} + +sub queue_post_respond { + my ($self, $rc, $msg, $args) = @_; + $self->log(LOGERROR, @$msg) unless ($rc == OK or $rc == 0); } diff --git a/qpsmtpd-async b/qpsmtpd-async new file mode 100755 index 0000000..0890ba4 --- /dev/null +++ b/qpsmtpd-async @@ -0,0 +1,311 @@ +#!/usr/bin/perl + +use lib "./lib"; +BEGIN { + delete $ENV{ENV}; + delete $ENV{BASH_ENV}; + $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin'; +} + +use strict; +use vars qw($DEBUG); +use FindBin qw(); +# TODO: need to make this taint friendly +use lib "$FindBin::Bin/lib"; +use Danga::Socket; +use Danga::Client; +use Qpsmtpd::PollServer; +use Qpsmtpd::ConfigServer; +use Qpsmtpd::Constants; +use IO::Socket; +use Carp; +use POSIX qw(WNOHANG); +use Getopt::Long; + +$|++; + +use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET); + +$SIG{'PIPE'} = "IGNORE"; # handled manually + +$DEBUG = 0; + +my $CONFIG_PORT = 20025; +my $CONFIG_LOCALADDR = '127.0.0.1'; + +my $PORT = 2525; +my $LOCALADDR = '0.0.0.0'; +my $PROCS = 1; +my $USER = 'smtpd'; # user to suid to +my $PAUSED = 0; +my $NUMACCEPT = 20; +my $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept); + +# make sure we don't spend forever doing accept() +use constant ACCEPT_MAX => 1000; + +sub reset_num_accept { + $NUMACCEPT = 20; +} + +sub help { + print < \$PORT, + 'l|listen-address=s' => \$LOCALADDR, + 'j|procs=i' => \$PROCS, + 'd|debug+' => \$DEBUG, + 'u|user=s' => \$USER, + 'h|help' => \&help, +) || help(); + +# detaint the commandline +if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help } +if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help } +if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help } +if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help } + +sub force_poll { + $Danga::Socket::HaveEpoll = 0; + $Danga::Socket::HaveKQueue = 0; +} + +my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : + $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()"); + +my $SERVER; +my $CONFIG_SERVER; + +my %childstatus = (); + +run_as_server(); +exit(0); + +sub _fork { + my $pid = fork; + if (!defined($pid)) { die "Cannot fork: $!" } + return $pid if $pid; + + # Fixup Net::DNS randomness after fork + srand($$ ^ time); + + local $^W; + delete $INC{'Net/DNS/Header.pm'}; + require Net::DNS::Header; + + # cope with different versions of Net::DNS + eval { + $Net::DNS::Resolver::global{id} = 1; + $Net::DNS::Resolver::global{id} = int(rand(Net::DNS::Resolver::MAX_ID())); + # print "Next DNS ID: $Net::DNS::Resolver::global{id}\n"; + }; + if ($@) { + # print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n"; + } + + # Fixup lost kqueue after fork + $Danga::Socket::HaveKQueue = undef; +} + +sub spawn_child { + my $plugin_loader = shift || Qpsmtpd::SMTP->new; + if (my $pid = _fork) { + return $pid; + } + + $SIG{HUP} = $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT'; + $SIG{PIPE} = 'IGNORE'; + + Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler); + + $plugin_loader->run_hooks('post-fork'); + + Qpsmtpd::PollServer->EventLoop(); + exit; +} + +sub sig_chld { + my $spawn_count = 0; + while ( (my $child = waitpid(-1,WNOHANG)) > 0) { + if (!defined $childstatus{$child}) { + next; + } + + last unless $child > 0; + print "SIGCHLD: child $child died\n"; + delete $childstatus{$child}; + $spawn_count++; + } + if ($spawn_count) { + for (1..$spawn_count) { + # restart a new child if in poll server mode + my $pid = spawn_child(); + $childstatus{$pid} = 1; + } + } + $SIG{CHLD} = \&sig_chld; +} + +sub HUNTSMAN { + $SIG{CHLD} = 'DEFAULT'; + kill 'INT' => keys %childstatus; + exit(0); +} + +sub run_as_server { + # establish SERVER socket, bind and listen. + $SERVER = IO::Socket::INET->new(LocalPort => $PORT, + LocalAddr => $LOCALADDR, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => SOMAXCONN ) + or die "Error creating server $LOCALADDR:$PORT : $@\n"; + + IO::Handle::blocking($SERVER, 0); + binmode($SERVER, ':raw'); + + $CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT, + LocalAddr => $CONFIG_LOCALADDR, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => 1 ) + or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n"; + + IO::Handle::blocking($CONFIG_SERVER, 0); + binmode($CONFIG_SERVER, ':raw'); + + # Drop priviledges + my (undef, undef, $quid, $qgid) = getpwnam $USER or + die "unable to determine uid/gid for $USER\n"; + $) = ""; + POSIX::setgid($qgid) or + die "unable to change gid: $!\n"; + POSIX::setuid($quid) or + die "unable to change uid: $!\n"; + $> = $quid; + + # Load plugins here + my $plugin_loader = Qpsmtpd::SMTP->new(); + $plugin_loader->load_plugins; + + $plugin_loader->log(LOGINFO, 'Running as user '. + (getpwuid($>) || $>) . + ', group '. + (getgrgid($)) || $))); + + $SIG{INT} = $SIG{TERM} = \&HUNTSMAN; + + if ($PROCS > 1) { + for (1..$PROCS) { + my $pid = spawn_child($plugin_loader); + $childstatus{$pid} = 1; + } + $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children $POLL"); + $SIG{'CHLD'} = \&sig_chld; + sleep while (1); + } + else { + $plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process $POLL"); + Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler, + fileno($CONFIG_SERVER) => \&config_handler, + ); + $plugin_loader->run_hooks('post-fork'); + while (1) { + Qpsmtpd::PollServer->EventLoop(); + } + exit; + } + +} + +sub config_handler { + my $csock = $CONFIG_SERVER->accept(); + if (!$csock) { + # warn("accept failed on config server: $!"); + return; + } + binmode($csock, ':raw'); + + printf("Config server connection\n") if $DEBUG; + + IO::Handle::blocking($csock, 0); + setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + my $client = Qpsmtpd::ConfigServer->new($csock); + $client->watch_read(1); + return; +} + +# Accept all new connections +sub accept_handler { + for (1 .. $NUMACCEPT) { + return unless _accept_handler(); + } + + # got here because we have accept's left. + # So double the number we accept next time. + $NUMACCEPT *= 2; + $NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX; + $ACCEPT_RSET->cancel; + $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept); +} + +use Errno qw(EAGAIN EWOULDBLOCK); + +sub _accept_handler { + my $csock = $SERVER->accept(); + if (!$csock) { + # warn("accept() failed: $!"); + return; + } + binmode($csock, ':raw'); + + printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock)) + if $DEBUG; + + IO::Handle::blocking($csock, 0); + #setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + my $client = Qpsmtpd::PollServer->new($csock); + + if ($PAUSED) { + $client->write("451 Sorry, this server is currently paused\r\n"); + $client->close; + return 1; + } + + $client->push_back_read("Connect\n"); + $client->watch_read(1); + return 1; +} + +######################################################################## + +sub log { + my ($level,$message) = @_; + # $level not used yet. this is reimplemented from elsewhere anyway + warn("$$ fd:? $message\n"); +} + +sub pause { + my ($pause) = @_; + $PAUSED = $pause; +}