diff --git a/lib/Qpsmtpd.pm b/lib/Qpsmtpd.pm index 382aea7..2829cc7 100644 --- a/lib/Qpsmtpd.pm +++ b/lib/Qpsmtpd.pm @@ -167,9 +167,15 @@ sub _config_from_file { return wantarray ? @config : $config[0]; } +our $HOOKS; + sub load_plugins { my $self = shift; - + + if ($HOOKS) { + return $self->{hooks} = $HOOKS; + } + $self->log(LOGWARN, "Plugins already loaded") if $self->{hooks}; $self->{hooks} = {}; @@ -180,6 +186,8 @@ sub load_plugins { @plugins = $self->_load_plugins($dir, @plugins); + $HOOKS = $self->{hooks}; + return @plugins; } @@ -252,51 +260,21 @@ sub transaction { sub run_hooks { my ($self, $hook) = (shift, shift); + if ($self->{_continuation} && $hook ne "logging") { + die "Continuations in progress from previous hook (this is the $hook hook)"; + } my $hooks = $self->{hooks}; if ($hooks->{$hook}) { my @r; - for my $code (@{$hooks->{$hook}}) { - if ( $hook eq 'logging' ) { # without calling $self->log() - eval { (@r) = $code->{code}->($self, $self->transaction, @_); }; - $@ and warn("FATAL LOGGING PLUGIN ERROR: ", $@) and next; + my @local_hooks = @{$hooks->{$hook}}; + while (@local_hooks) { + my $code = shift @local_hooks; + @r = $self->run_hook($hook, $code, @_); + next unless @r; + if ($r[0] == CONTINUATION) { + $self->disable_read() if $self->isa('Danga::Client'); + $self->{_continuation} = [$hook, [@_], @local_hooks]; } - else { - $self->varlog(LOGINFO, $hook, $code->{name}); - eval { (@r) = $code->{code}->($self, $self->transaction, @_); }; - $@ and $self->log(LOGCRIT, "FATAL PLUGIN ERROR: ", $@) and next; - - !defined $r[0] - and $self->log(LOGERROR, "plugin ".$code->{name} - ." running the $hook hook returned undef!") - and next; - - if ($self->transaction) { - my $tnotes = $self->transaction->notes( $code->{name} ); - $tnotes->{"hook_$hook"}->{'return'} = $r[0] - if (!defined $tnotes || ref $tnotes eq "HASH"); - } else { - my $cnotes = $self->connection->notes( $code->{name} ); - $cnotes->{"hook_$hook"}->{'return'} = $r[0] - if (!defined $cnotes || ref $cnotes eq "HASH"); - } - - # should we have a hook for "OK" too? - if ($r[0] == DENY or $r[0] == DENYSOFT or - $r[0] == DENY_DISCONNECT or $r[0] == DENYSOFT_DISCONNECT) - { - $r[1] = "" if not defined $r[1]; - $self->log(LOGDEBUG, "Plugin ".$code->{name}. - ", hook $hook returned ".return_code($r[0]).", $r[1]"); - $self->run_hooks("deny", $code->{name}, $r[0], $r[1]) unless ($hook eq "deny"); - } else { - $r[1] = "" if not defined $r[1]; - $self->log(LOGDEBUG, "Plugin ".$code->{name}. - ", hook $hook returned ".return_code($r[0]).", $r[1]"); - $self->run_hooks("ok", $code->{name}, $r[0], $r[1]) unless ($hook eq "ok"); - } - - } - last unless $r[0] == DECLINED; } $r[0] = DECLINED if not defined $r[0]; @@ -305,6 +283,77 @@ sub run_hooks { return (0, ''); } +sub finish_continuation { + my ($self) = @_; + die "No continuation in progress" unless $self->{_continuation}; + $self->enable_read() if $self->isa('Danga::Client'); + my $todo = $self->{_continuation}; + $self->{_continuation} = undef; + my $hook = shift @$todo || die "No hook in the continuation"; + my $args = shift @$todo || die "No hook args in the continuation"; + my @r; + while (@$todo) { + my $code = shift @$todo; + @r = $self->run_hook($hook, $code, @$args); + if ($r[0] == CONTINUATION) { + $self->disable_read() if $self->isa('Danga::Client'); + $self->{_continuation} = [$hook, $args, @$todo]; + return @r; + } + last unless $r[0] == DECLINED; + } + $r[0] = DECLINED if not defined $r[0]; + my $responder = $hook . "_respond"; + if (my $meth = $self->can($responder)) { + warn("continuation finished on $self\n"); + return $meth->($self, $r[0], $r[1], @$args); + } + die "No ${hook}_respond method"; +} + +sub run_hook { + my ($self, $hook, $code, @args) = @_; + my @r; + if ( $hook eq 'logging' ) { # without calling $self->log() + eval { (@r) = $code->{code}->($self, $self->transaction, @_); }; + $@ and warn("FATAL LOGGING PLUGIN ERROR: ", $@) and next; + } + else { + $self->varlog(LOGINFO, $hook, $code->{name}); + print STDERR "plugin $hook $code->{name} 1\n"; + eval { (@r) = $code->{code}->($self, $self->transaction, @args); }; + print STDERR "plugin $hook $code->{name} 2\n"; + + $@ and $self->log(LOGCRIT, "FATAL PLUGIN ERROR: ", $@) and return; + + !defined $r[0] + and $self->log(LOGERROR, "plugin ".$code->{name} + ."running the $hook hook returned undef!") + and return; + + if ($self->transaction) { + my $tnotes = $self->transaction->notes( $code->{name} ); + $tnotes->{"hook_$hook"}->{'return'} = $r[0] + if (!defined $tnotes || ref $tnotes eq "HASH"); + } else { + my $cnotes = $self->connection->notes( $code->{name} ); + $cnotes->{"hook_$hook"}->{'return'} = $r[0] + if (!defined $cnotes || ref $cnotes eq "HASH"); + } + + # should we have a hook for "OK" too? + if ($r[0] == DENY or $r[0] == DENYSOFT or + $r[0] == DENY_DISCONNECT or $r[0] == DENYSOFT_DISCONNECT) + { + $r[1] = "" if not defined $r[1]; + $self->log(LOGDEBUG, "Plugin $code->{name}, hook $hook returned $r[0], $r[1]"); + $self->run_hooks("deny", $code->{name}, $r[0], $r[1]) unless ($hook eq "deny"); + } + + } + return @r; +} + sub _register_hook { my $self = shift; my ($hook, $code, $unshift) = @_; diff --git a/lib/Qpsmtpd/ConfigServer.pm b/lib/Qpsmtpd/ConfigServer.pm new file mode 100644 index 0000000..2200cb0 --- /dev/null +++ b/lib/Qpsmtpd/ConfigServer.pm @@ -0,0 +1,285 @@ +# $Id$ + +package Qpsmtpd::ConfigServer; + +use base ('Danga::Client'); +use Qpsmtpd::Constants; + +use strict; + +use fields qw( + _auth + _commands + _config_cache + _connection + _transaction + _test_mode + _extras + other_fds +); + +my $PROMPT = "Enter command: "; + +sub new { + my Qpsmtpd::ConfigServer $self = shift; + + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + $self->write($PROMPT); + return $self; +} + +sub max_idle_time { 3600 } # one hour + +sub process_line { + my $self = shift; + my $line = shift || return; + if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } + local $SIG{ALRM} = sub { + my ($pkg, $file, $line) = caller(); + die "ALARM: $pkg, $file, $line"; + }; + my $prev = alarm(2); # must process a command in < 2 seconds + my $resp = eval { $self->_process_line($line) }; + alarm($prev); + if ($@) { + print STDERR "Error: $@\n"; + } + return $resp || ''; +} + +sub respond { + my $self = shift; + my (@messages) = @_; + while (my $msg = shift @messages) { + $self->write("$msg\r\n"); + } + return; +} + +sub fault { + my $self = shift; + my ($msg) = shift || "program fault - command not performed"; + print STDERR "$0 [$$]: $msg ($!)\n"; + $self->respond("Error - " . $msg); + return $PROMPT; +} + +sub _process_line { + my $self = shift; + my $line = shift; + + $line =~ s/\r?\n//; + my ($cmd, @params) = split(/ +/, $line); + my $meth = "cmd_" . lc($cmd); + if (my $lookup = $self->can($meth)) { + my $resp = eval { + $lookup->($self, @params); + }; + if ($@) { + my $error = $@; + chomp($error); + Qpsmtpd->log(LOGERROR, "Command Error: $error"); + return $self->fault("command '$cmd' failed unexpectedly"); + } + return "$resp\n$PROMPT"; + } + else { + # No such method - i.e. unrecognized command + return $self->fault("command '$cmd' unrecognised"); + } +} + +my %helptext = ( + help => "HELP [CMD] - Get help on all commands or a specific command", + status => "STATUS - Returns status information about current connections", + list => "LIST [LIMIT] - List the connections, specify limit or negative limit to shrink list", + kill => "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF", + pause => "PAUSE - Stop accepting new connections", + continue => "CONTINUE - Resume accepting connections", + reload => "RELOAD - Reload all plugins and config", + quit => "QUIT - Exit the config server", + ); + +sub cmd_help { + my $self = shift; + my ($subcmd) = @_; + + $subcmd ||= 'help'; + $subcmd = lc($subcmd); + + if ($subcmd eq 'help') { + my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext)); + return "Available Commands:\n\n$txt\n"; + } + my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list."; + return "$txt\n"; +} + +sub cmd_quit { + my $self = shift; + $self->close; +} + +sub cmd_pause { + my $self = shift; + + my $other_fds = $self->OtherFds; + + $self->{other_fds} = { %$other_fds }; + %$other_fds = (); + return "PAUSED"; +} + +sub cmd_continue { + my $self = shift; + + my $other_fds = $self->{other_fds}; + + $self->OtherFds( %$other_fds ); + %$other_fds = (); + return "UNPAUSED"; +} + +sub cmd_status { + my $self = shift; + +# Status should show: +# - Total time running +# - Total number of mails received +# - Total number of mails rejected (5xx) +# - Total number of mails tempfailed (5xx) +# - Avg number of mails/minute +# - Number of current connections +# - Number of outstanding DNS queries + + my $output = "Current Status as of " . gmtime() . " GMT\n\n"; + + if (defined &Qpsmtpd::Plugin::stats::register) { + # Stats plugin is loaded + $output .= Qpsmtpd::Plugin::stats->get_stats; + } + + my $descriptors = Danga::Socket->DescriptorMap; + + my $current_connections = 0; + my $current_dns = 0; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + $current_connections++; + } + elsif ($pob->isa("Danga::DNS::Resolver")) { + $current_dns = $pob->pending; + } + } + + $output .= "Curr Connections: $current_connections / $::MAXconn\n". + "Curr DNS Queries: $current_dns"; + + return $output; +} + +sub cmd_list { + my $self = shift; + my ($count) = @_; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n"; + my @all; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + next unless $pob->connection->remote_ip; # haven't even started yet + push @all, [$pob+0, $pob->connection->remote_ip, + $pob->connection->remote_host, $pob->uptime]; + } + } + + @all = sort { $a->[3] <=> $b->[3] } @all; + if ($count) { + if ($count > 0) { + @all = @all[$#all-($count-1) .. $#all]; + } + else { + @all = @all[0..(abs($count) - 1)]; + } + } + foreach my $item (@all) { + $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item); + } + + return $list; +} + +sub cmd_kill { + my $self = shift; + my ($match) = @_; + + return "SYNTAX: KILL (\$IP | \$REF)\n" unless $match; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $killed = 0; + my $is_ip = (index($match, '.') >= 0); + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + if ($is_ip) { + next unless $pob->connection->remote_ip; # haven't even started yet + if ($pob->connection->remote_ip eq $match) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + else { + # match by ID + if ($pob+0 == hex($match)) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + } + } + + return "Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n"; +} + +sub cmd_dump { + my $self = shift; + my ($ref) = @_; + + return "SYNTAX: DUMP \$REF\n" unless $ref; + require Data::Dumper; + $Data::Dumper::Indent=1; + + my $descriptors = Danga::Socket->DescriptorMap; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + if ($pob+0 == hex($ref)) { + return Data::Dumper::Dumper($pob); + } + } + } + + return "Unable to find the connection: $ref. Try the LIST command\n"; +} + +1; +__END__ + +=head1 NAME + +Qpsmtpd::ConfigServer - a configuration server for qpsmtpd + +=head1 DESCRIPTION + +When qpsmtpd runs in multiplex mode it also provides a config server that you +can connect to. This allows you to view current connection statistics and other +gumph that you probably don't care about. + +=cut \ No newline at end of file diff --git a/lib/Qpsmtpd/Constants.pm b/lib/Qpsmtpd/Constants.pm index 68bd8f6..8be3268 100644 --- a/lib/Qpsmtpd/Constants.pm +++ b/lib/Qpsmtpd/Constants.pm @@ -4,27 +4,28 @@ require Exporter; # log levels my %log_levels = ( - LOGDEBUG => 7, - LOGINFO => 6, - LOGNOTICE => 5, - LOGWARN => 4, - LOGERROR => 3, - LOGCRIT => 2, - LOGALERT => 1, - LOGEMERG => 0, - LOGRADAR => 0, + LOGDEBUG => 7, + LOGINFO => 6, + LOGNOTICE => 5, + LOGWARN => 4, + LOGERROR => 3, + LOGCRIT => 2, + LOGALERT => 1, + LOGEMERG => 0, + LOGRADAR => 0, ); # return codes my %return_codes = ( - OK => 900, - DENY => 901, # 550 - DENYSOFT => 902, # 450 - DENYHARD => 903, # 550 + disconnect (deprecated in 0.29) - DENY_DISCONNECT => 903, # 550 + disconnect - DENYSOFT_DISCONNECT => 904, # 450 + disconnect - DECLINED => 909, - DONE => 910, + OK => 900, + DENY => 901, # 550 + DENYSOFT => 902, # 450 + DENYHARD => 903, # 550 + disconnect (deprecated in 0.29) + DENY_DISCONNECT => 903, # 550 + disconnect + DENYSOFT_DISCONNECT => 904, # 450 + disconnect + DECLINED => 909, + DONE => 910, + CONTINUATION => 911, ); use vars qw(@ISA @EXPORT); @@ -42,24 +43,24 @@ foreach (keys %log_levels ) { sub return_code { my $test = shift; if ( $test =~ /^\d+$/ ) { # need to return the textural form - foreach ( keys %return_codes ) { - return $_ if $return_codes{$_} =~ /$test/; - } + foreach ( keys %return_codes ) { + return $_ if $return_codes{$_} =~ /$test/; + } } else { # just return the numeric value - return $return_codes{$test}; + return $return_codes{$test}; } } sub log_level { my $test = shift; if ( $test =~ /^\d+$/ ) { # need to return the textural form - foreach ( keys %log_levels ) { - return $_ if $log_levels{$_} =~ /$test/; - } + foreach ( keys %log_levels ) { + return $_ if $log_levels{$_} =~ /$test/; + } } else { # just return the numeric value - return $log_levels{$test}; + return $log_levels{$test}; } } diff --git a/lib/Qpsmtpd/Plugin.pm b/lib/Qpsmtpd/Plugin.pm index 48f3a43..15b05ff 100644 --- a/lib/Qpsmtpd/Plugin.pm +++ b/lib/Qpsmtpd/Plugin.pm @@ -37,11 +37,15 @@ sub _register { my $self = shift; my $qp = shift; local $self->{_qp} = $qp; - $self->init($qp, @_) if $self->can('init'); + $self->init($qp, @_); $self->_register_standard_hooks($qp, @_); - $self->register($qp, @_) if $self->can('register'); + $self->register($qp, @_); } +# Designed to be overloaded +sub init {} +sub register {} + sub qp { shift->{_qp}; } @@ -61,6 +65,10 @@ sub connection { shift->qp->connection; } +sub config { + shift->qp->config(@_); +} + sub spool_dir { shift->qp->spool_dir; } diff --git a/lib/Qpsmtpd/PollServer.pm b/lib/Qpsmtpd/PollServer.pm new file mode 100644 index 0000000..c9a918c --- /dev/null +++ b/lib/Qpsmtpd/PollServer.pm @@ -0,0 +1,339 @@ +# $Id: Server.pm,v 1.10 2005/02/14 22:04:48 msergeant Exp $ + +package Qpsmtpd::PollServer; + +use base ('Danga::Client', 'Qpsmtpd::SMTP'); +# use fields required to be a subclass of Danga::Client. Have to include +# all fields used by Qpsmtpd.pm here too. +use fields qw( + input_sock + mode + header_lines + in_header + data_size + max_size + hooks + start_time + _auth + _commands + _config_cache + _connection + _transaction + _test_mode + _extras + _continuation +); +use Qpsmtpd::Constants; +use Qpsmtpd::Auth; +use Qpsmtpd::Address; +use Danga::DNS; +use Mail::Header; +use POSIX qw(strftime); +use Socket qw(inet_aton AF_INET CRLF); +use Time::HiRes qw(time); +use strict; + +sub max_idle_time { 60 } +sub max_connect_time { 1200 } + +sub input_sock { + my $self = shift; + @_ and $self->{input_sock} = shift; + $self->{input_sock} || $self; +} + +sub new { + my Qpsmtpd::PollServer $self = shift; + + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + $self->{start_time} = time; + $self->{mode} = 'connect'; + $self->load_plugins; + return $self; +} + +sub uptime { + my Qpsmtpd::PollServer $self = shift; + + return (time() - $self->{start_time}); +} + +sub reset_for_next_message { + my $self = shift; + $self->SUPER::reset_for_next_message(@_); + + $self->{_commands} = { + ehlo => 1, + helo => 1, + rset => 1, + mail => 1, + rcpt => 1, + data => 1, + help => 1, + vrfy => 1, + noop => 1, + quit => 1, + auth => 0, # disabled by default + }; + $self->{mode} = 'cmd'; + $self->{_extras} = {}; +} + +sub respond { + my $self = shift; + my ($code, @messages) = @_; + while (my $msg = shift @messages) { + my $line = $code . (@messages ? "-" : " ") . $msg; + $self->write("$line\r\n"); + } + return 1; +} + +sub fault { + my $self = shift; + $self->SUPER::fault(@_); + return; +} + +sub log { + my ($self, $trace, @log) = @_; + my $fd = $self->{fd}; + $fd ||= '?'; + $self->SUPER::log($trace, "fd:$fd", @log); +} + +sub process_line { + my $self = shift; + my $line = shift || return; + if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } + local $SIG{ALRM} = sub { + my ($pkg, $file, $line) = caller(); + die "ALARM: ($self->{mode}) $pkg, $file, $line"; + }; + my $prev = alarm(2); # must process a command in < 2 seconds + eval { $self->_process_line($line) }; + alarm($prev); + if ($@) { + print STDERR "Error: $@\n"; + return $self->fault("command failed unexpectedly") if $self->{mode} eq 'cmd'; + return $self->fault("error processing data lines") if $self->{mode} eq 'data'; + return $self->fault("unknown error"); + } + return; +} + +sub _process_line { + my $self = shift; + my $line = shift; + + if ($self->{mode} eq 'connect') { + $self->{mode} = 'cmd'; + my $rc = $self->start_conversation; + return; + } + elsif ($self->{mode} eq 'cmd') { + $line =~ s/\r?\n//; + return $self->process_cmd($line); + } + elsif ($self->{mode} eq 'data') { + return $self->data_line($line); + } + else { + die "Unknown mode"; + } +} + +sub process_cmd { + my $self = shift; + my $line = shift; + my ($cmd, @params) = split(/ +/, $line); + my $meth = lc($cmd); + if (my $lookup = $self->{_commands}->{$meth} && $self->can($meth)) { + my $resp = eval { + $lookup->($self, @params); + }; + if ($@) { + my $error = $@; + chomp($error); + $self->log(LOGERROR, "Command Error: $error"); + return $self->fault("command '$cmd' failed unexpectedly"); + } + return $resp; + } + else { + # No such method - i.e. unrecognized command + my ($rc, $msg) = $self->run_hooks("unrecognized_command", $meth, @params); + return $self->unrecognized_command_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; + } +} + +sub disconnect { + my $self = shift; + $self->SUPER::disconnect(@_); + $self->close; +} + +sub start_conversation { + my $self = shift; + + my $conn = $self->connection; + # set remote_host, remote_ip and remote_port + my ($ip, $port) = split(':', $self->peer_addr_string); + $conn->remote_ip($ip); + $conn->remote_port($port); + $conn->remote_info("[$ip]"); + Danga::DNS->new( + client => $self, + # NB: Setting remote_info to the same as remote_host + callback => sub { $conn->remote_info($conn->remote_host($_[0])) }, + host => $ip, + ); + + my ($rc, $msg) = $self->run_hooks("connect"); + return $self->connect_respond($rc, $msg) unless $rc == CONTINUATION; + return DONE; +} + +sub data { + my $self = shift; + + my ($rc, $msg) = $self->run_hooks("data"); + return $self->data_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub data_respond { + my ($self, $rc, $msg) = @_; + if ($rc == DONE) { + return; + } + elsif ($rc == DENY) { + $self->respond(554, $msg || "Message denied"); + $self->reset_transaction(); + return; + } + elsif ($rc == DENYSOFT) { + $self->respond(451, $msg || "Message denied temporarily"); + $self->reset_transaction(); + return; + } + elsif ($rc == DENY_DISCONNECT) { + $self->respond(554, $msg || "Message denied"); + $self->disconnect; + return; + } + elsif ($rc == DENYSOFT_DISCONNECT) { + $self->respond(451, $msg || "Message denied temporarily"); + $self->disconnect; + return; + } + return $self->respond(503, "MAIL first") unless $self->transaction->sender; + return $self->respond(503, "RCPT first") unless $self->transaction->recipients; + + $self->{mode} = 'data'; + + $self->{header_lines} = []; + $self->{data_size} = 0; + $self->{in_header} = 1; + $self->{max_size} = ($self->config('databytes'))[0] || 0; # this should work in scalar context + + $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}"); + + return $self->respond(354, "go ahead"); +} + +sub data_line { + my $self = shift; + + my $line = shift; + + if ($line eq ".\r\n") { + # add received etc. + $self->{mode} = 'cmd'; + return $self->end_of_data; + } + + # Reject messages that have either bare LF or CR. rjkaes noticed a + # lot of spam that is malformed in the header. + if ($line eq ".\n" or $line eq ".\r") { + $self->respond(421, "See http://smtpd.develooper.com/barelf.html"); + $self->disconnect; + return; + } + + # add a transaction->blocked check back here when we have line by line plugin access... + unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) { + $line =~ s/\r\n$/\n/; + $line =~ s/^\.\./\./; + + if ($self->{in_header} and $line =~ m/^\s*$/) { + # end of headers + $self->{in_header} = 0; + + # ... need to check that we don't reformat any of the received lines. + # + # 3.8.2 Received Lines in Gatewaying + # When forwarding a message into or out of the Internet environment, a + # gateway MUST prepend a Received: line, but it MUST NOT alter in any + # way a Received: line that is already in the header. + + my $header = Mail::Header->new($self->{header_lines}, + Modify => 0, MailFrom => "COERCE"); + $self->transaction->header($header); + + #$header->add("X-SMTPD", "qpsmtpd/".$self->version.", http://smtpd.develooper.com/"); + + # FIXME - call plugins to work on just the header here; can + # save us buffering the mail content. + } + + if ($self->{in_header}) { + push @{ $self->{header_lines} }, $line; + } + else { + $self->transaction->body_write($line); + } + + $self->{data_size} += length $line; + } + + return; +} + +sub end_of_data { + my $self = shift; + + #$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300); + + $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}"); + + my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP"; + + my $header = $self->transaction->header; + if (!$header) { + $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE"); + $self->transaction->header($header); + } + + # only true if client authenticated + if ( defined $self->{_auth} and $self->{_auth} == OK ) { + $header->add("X-Qpsmtpd-Auth","True"); + } + + $header->add("Received", "from ".$self->connection->remote_info + ." (HELO ".$self->connection->hello_host . ") (".$self->connection->remote_ip + . ")\n by ".$self->config('me')." (qpsmtpd/".$self->version + .") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', localtime)), + 0); + + return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size}; + + my ($rc, $msg) = $self->run_hooks("data_post"); + return $self->data_post_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +1; + diff --git a/lib/Qpsmtpd/SMTP.pm b/lib/Qpsmtpd/SMTP.pm index df6ac69..2a6172b 100644 --- a/lib/Qpsmtpd/SMTP.pm +++ b/lib/Qpsmtpd/SMTP.pm @@ -51,21 +51,9 @@ sub dispatch { $self->{_counter}++; if ($cmd !~ /^(\w{1,12})$/ or !exists $self->{_commands}->{$1}) { - my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd); - if ($rc == DENY_DISCONNECT) { - $self->respond(521, $msg); - $self->disconnect; - } - elsif ($rc == DENY) { - $self->respond(500, $msg); - } - elsif ($rc == DONE) { - 1; - } - else { - $self->respond(500, "Unrecognized command"); - } - return 1 + my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd, @_); + return $self->unrecognized_command_respond($rc, $msg, @_) unless $rc == CONTINUATION; + return 1; } $cmd = $1; @@ -79,6 +67,20 @@ sub dispatch { return; } +sub unrecognized_command_respond { + my ($self, $rc, $msg) = @_; + if ($rc == DENY_DISCONNECT) { + $self->respond(521, $msg); + $self->disconnect; + } + elsif ($rc == DENY) { + $self->respond(500, $msg); + } + elsif ($rc != DONE) { + $self->respond(500, "Unrecognized command"); + } +} + sub fault { my $self = shift; my ($msg) = shift || "program fault - command not performed"; @@ -92,12 +94,20 @@ sub start_conversation { # this should maybe be called something else than "connect", see # lib/Qpsmtpd/TcpServer.pm for more confusion. my ($rc, $msg) = $self->run_hooks("connect"); + return $self->connect_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub connect_respond { + my ($self, $rc, $msg) = @_; if ($rc == DENY) { $self->respond(550, ($msg || 'Connection from you denied, bye bye.')); + $self->disconnect; return $rc; } elsif ($rc == DENYSOFT) { $self->respond(450, ($msg || 'Connection from you temporarily denied, bye bye.')); + $self->disconnect; return $rc; } elsif ($rc == DONE) { @@ -124,6 +134,7 @@ sub reset_transaction { sub connection { my $self = shift; + @_ and $self->{_connection} = shift; return $self->{_connection} || ($self->{_connection} = Qpsmtpd::Connection->new()); } @@ -136,11 +147,16 @@ sub helo { return $self->respond (503, "but you already said HELO ...") if $conn->hello; my ($rc, $msg) = $self->run_hooks("helo", $hello_host, @stuff); - if ($rc == DONE) { - # do nothing - } elsif ($rc == DENY) { + return $self->helo_respond($rc, $msg, $hello_host, @stuff) unless $rc == CONTINUATION; + return 1; +} + +sub helo_respond { + my ($self, $rc, $msg, $hello_host) = @_; + if ($rc == DENY) { $self->respond(550, $msg); - } elsif ($rc == DENYSOFT) { + } + elsif ($rc == DENYSOFT) { $self->respond(450, $msg); } elsif ($rc == DENY_DISCONNECT) { $self->respond(550, $msg); @@ -148,11 +164,14 @@ sub helo { } elsif ($rc == DENYSOFT_DISCONNECT) { $self->respond(450, $msg); $self->disconnect; - } else { + } + elsif ($rc != DONE) { + my $conn = $self->connection; $conn->hello("helo"); $conn->hello_host($hello_host); $self->transaction; - $self->respond(250, $self->config('me') ." Hi " . $conn->remote_info . " [" . $conn->remote_ip ."]; I am so happy to meet you."); + $self->respond(250, $self->config('me') ." Hi " . $conn->remote_info . + " [" . $conn->remote_ip ."]; I am so happy to meet you."); } } @@ -164,11 +183,16 @@ sub ehlo { return $self->respond (503, "but you already said HELO ...") if $conn->hello; my ($rc, $msg) = $self->run_hooks("ehlo", $hello_host, @stuff); - if ($rc == DONE) { - # do nothing - } elsif ($rc == DENY) { + return $self->ehlo_respond($rc, $msg, $hello_host, @stuff) unless $rc == CONTINUATION; + return 1; +} + +sub ehlo_respond { + my ($self, $rc, $msg, $hello_host) = @_; + if ($rc == DENY) { $self->respond(550, $msg); - } elsif ($rc == DENYSOFT) { + } + elsif ($rc == DENYSOFT) { $self->respond(450, $msg); } elsif ($rc == DENY_DISCONNECT) { $self->respond(550, $msg); @@ -176,7 +200,9 @@ sub ehlo { } elsif ($rc == DENYSOFT_DISCONNECT) { $self->respond(450, $msg); $self->disconnect; - } else { + } + elsif ($rc != DONE) { + my $conn = $self->connection; $conn->hello("ehlo"); $conn->hello_host($hello_host); $self->transaction; @@ -241,57 +267,62 @@ sub mail { unless ($self->connection->hello) { return $self->respond(503, "please say hello first ..."); } + + my $from_parameter = join " ", @_; + $self->log(LOGINFO, "full from_parameter: $from_parameter"); + + my ($from) = ($from_parameter =~ m/^from:\s*(<[^>]*>)/i)[0]; + + # support addresses without <> ... maybe we shouldn't? + ($from) = "<" . ($from_parameter =~ m/^from:\s*(\S+)/i)[0] . ">" + unless $from; + + $self->log(LOGALERT, "from email address : [$from]"); + + if ($from eq "<>" or $from =~ m/\[undefined\]/ or $from eq "<#@[]>") { + $from = Qpsmtpd::Address->new("<>"); + } else { - my $from_parameter = join " ", @_; - $self->log(LOGINFO, "full from_parameter: $from_parameter"); + $from = (Qpsmtpd::Address->parse($from))[0]; + } + return $self->respond(501, "could not parse your mail from command") unless $from; - my ($from) = ($from_parameter =~ m/^from:\s*(<[^>]*>)/i)[0]; + my ($rc, $msg) = $self->run_hooks("mail", $from); + return $self->mail_respond($rc, $msg, $from) unless $rc == CONTINUATION; + return 1; +} - # support addresses without <> ... maybe we shouldn't? - ($from) = "<" . ($from_parameter =~ m/^from:\s*(\S+)/i)[0] . ">" - unless $from; - - $self->log(LOGALERT, "from email address : [$from]"); - - if ($from eq "<>" or $from =~ m/\[undefined\]/ or $from eq "<#@[]>") { - $from = Qpsmtpd::Address->new("<>"); - } - else { - $from = (Qpsmtpd::Address->parse($from))[0]; - } - return $self->respond(501, "could not parse your mail from command") unless $from; - - my ($rc, $msg) = $self->run_hooks("mail", $from); - if ($rc == DONE) { - return 1; - } - elsif ($rc == DENY) { - $msg ||= $from->format . ', denied'; - $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); - $self->respond(550, $msg); - } - elsif ($rc == DENYSOFT) { - $msg ||= $from->format . ', temporarily denied'; - $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); - $self->respond(450, $msg); - } - elsif ($rc == DENY_DISCONNECT) { - $msg ||= $from->format . ', denied'; - $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); - $self->respond(550, $msg); - $self->disconnect; - } - elsif ($rc == DENYSOFT_DISCONNECT) { - $msg ||= $from->format . ', temporarily denied'; - $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); - $self->respond(421, $msg); - $self->disconnect; - } - else { # includes OK - $self->log(LOGINFO, "getting mail from ".$from->format); - $self->respond(250, $from->format . ", sender OK - how exciting to get mail from you!"); - $self->transaction->sender($from); - } +sub mail_respond { + my ($self, $rc, $msg, $from) = @_; + if ($rc == DONE) { + return 1; + } + elsif ($rc == DENY) { + $msg ||= $from->format . ', denied'; + $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); + $self->respond(550, $msg); + } + elsif ($rc == DENYSOFT) { + $msg ||= $from->format . ', temporarily denied'; + $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); + $self->respond(450, $msg); + } + elsif ($rc == DENY_DISCONNECT) { + $msg ||= $from->format . ', denied'; + $self->log(LOGINFO, "deny mail from " . $from->format . " ($msg)"); + $self->respond(550, $msg); + $self->disconnect; + } + elsif ($rc == DENYSOFT_DISCONNECT) { + $msg ||= $from->format . ', temporarily denied'; + $self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)"); + $self->respond(450, $msg); + $self->disconnect; + } + else { # includes OK + $self->log(LOGINFO, "getting mail from ".$from->format); + $self->respond(250, $from->format . ", sender OK - how exciting to get mail from you!"); + $self->transaction->sender($from); } } @@ -308,6 +339,12 @@ sub rcpt { return $self->respond(501, "could not parse recipient") unless $rcpt; my ($rc, $msg) = $self->run_hooks("rcpt", $rcpt); + return $self->rcpt_respond($rc, $msg, $rcpt) unless $rc == CONTINUATION; + return 1; +} + +sub rcpt_respond { + my ($self, $rc, $msg, $rcpt) = @_; if ($rc == DONE) { return 1; } @@ -342,7 +379,6 @@ sub rcpt { } - sub help { my $self = shift; $self->respond(214, @@ -364,6 +400,12 @@ sub vrfy { # I also don't think it provides all the proper result codes. my ($rc, $msg) = $self->run_hooks("vrfy"); + return $self->vrfy_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub vrfy_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -391,6 +433,12 @@ sub rset { sub quit { my $self = shift; my ($rc, $msg) = $self->run_hooks("quit"); + return $self->quit_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub quit_respond { + my ($self, $rc, $msg) = @_; if ($rc != DONE) { $self->respond(221, $self->config('me') . " closing connection. Have a wonderful day."); } @@ -403,9 +451,17 @@ sub disconnect { $self->reset_transaction; } +sub disconnect_respond { } + sub data { my $self = shift; my ($rc, $msg) = $self->run_hooks("data"); + return $self->data_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub data_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -523,6 +579,11 @@ sub data { $self->respond(552, "Message too big!"),return 1 if $max_size and $size > $max_size; ($rc, $msg) = $self->run_hooks("data_post"); + return $self->data_post_respond($rc, $msg) unless $rc == CONTINUATION; +} + +sub data_post_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -538,7 +599,6 @@ sub data { # DATA is always the end of a "transaction" return $self->reset_transaction; - } sub getline { @@ -554,6 +614,12 @@ sub queue { my ($self, $transaction) = @_; my ($rc, $msg) = $self->run_hooks("queue"); + return $self->queue_respond($rc, $msg) unless $rc == CONTINUATION; + return 1; +} + +sub queue_respond { + my ($self, $rc, $msg) = @_; if ($rc == DONE) { return 1; } @@ -569,8 +635,6 @@ sub queue { else { $self->respond(451, $msg || "Queuing declined or disabled; try again later" ); } - - } diff --git a/lib/Qpsmtpd/SelectServer.pm b/lib/Qpsmtpd/SelectServer.pm deleted file mode 100644 index 07e5c56..0000000 --- a/lib/Qpsmtpd/SelectServer.pm +++ /dev/null @@ -1,320 +0,0 @@ -package Qpsmtpd::SelectServer; -use Qpsmtpd::SMTP; -use Qpsmtpd::Constants; -use IO::Socket; -use IO::Select; -use POSIX qw(strftime); -use Socket qw(CRLF); -use Fcntl; -use Tie::RefHash; -use Net::DNS; - -@ISA = qw(Qpsmtpd::SMTP); -use strict; - -our %inbuffer = (); -our %outbuffer = (); -our %ready = (); -our %lookup = (); -our %qp = (); -our %indata = (); - -tie %ready, 'Tie::RefHash'; -my $server; -my $select; - -our $QUIT = 0; - -$SIG{INT} = $SIG{TERM} = sub { $QUIT++ }; - -sub log { - my ($self, $trace, @log) = @_; - my $level = Qpsmtpd::TRACE_LEVEL(); - $level = $self->init_logger unless defined $level; - warn join(" ", fileno($self->client), @log), "\n" - if $trace <= $level; -} - -sub main { - my $class = shift; - my %opts = (LocalPort => 25, Reuse => 1, Listen => SOMAXCONN, @_); - $server = IO::Socket::INET->new(%opts) or die "Server: $@"; - print "Listening on $opts{LocalPort}\n"; - - nonblock($server); - - $select = IO::Select->new($server); - my $res = Net::DNS::Resolver->new; - - # TODO - make this more graceful - let all current SMTP sessions finish - # before quitting! - while (!$QUIT) { - foreach my $client ($select->can_read(1)) { - #print "Reading $client\n"; - if ($client == $server) { - my $client_addr; - $client = $server->accept(); - next unless $client; - my $ip = $client->peerhost; - my $bgsock = $res->bgsend($ip); - $select->add($bgsock); - $lookup{$bgsock} = $client; - } - elsif (my $qpclient = $lookup{$client}) { - my $packet = $res->bgread($client); - my $ip = $qpclient->peerhost; - my $hostname = $ip; - if ($packet) { - foreach my $rr ($packet->answer) { - if ($rr->type eq 'PTR') { - $hostname = $rr->rdatastr; - } - } - } - # $packet->print; - $select->remove($client); - delete($lookup{$client}); - my $qp = Qpsmtpd::SelectServer->new(); - $qp->client($qpclient); - $qp{$qpclient} = $qp; - $qp->log(LOGINFO, "Connection number " . keys(%qp)); - $inbuffer{$qpclient} = ''; - $outbuffer{$qpclient} = ''; - $ready{$qpclient} = []; - $qp->start_connection($ip, $hostname); - $qp->load_plugins; - my $rc = $qp->start_conversation; - if ($rc != DONE) { - close($client); - next; - } - $select->add($qpclient); - nonblock($qpclient); - } - else { - my $data = ''; - my $rv = $client->recv($data, POSIX::BUFSIZ(), 0); - - unless (defined($rv) && length($data)) { - freeclient($client) - unless ($! == POSIX::EWOULDBLOCK() || - $! == POSIX::EINPROGRESS() || - $! == POSIX::EINTR()); - next; - } - $inbuffer{$client} .= $data; - - while ($inbuffer{$client} =~ s/^([^\r\n]*)\r?\n//) { - #print "<$1\n"; - push @{$ready{$client}}, $1; - } - } - } - - #print "Processing...\n"; - foreach my $client (keys %ready) { - my $qp = $qp{$client}; - #print "Processing $client = $qp\n"; - foreach my $req (@{$ready{$client}}) { - if ($indata{$client}) { - $qp->data_line($req . CRLF); - } - else { - $qp->log(LOGINFO, "dispatching $req"); - defined $qp->dispatch(split / +/, $req) - or $qp->respond(502, "command unrecognized: '$req'"); - } - } - delete $ready{$client}; - } - - #print "Writing...\n"; - foreach my $client ($select->can_write(1)) { - next unless $outbuffer{$client}; - #print "Writing to $client\n"; - - my $rv = $client->send($outbuffer{$client}, 0); - unless (defined($rv)) { - warn("I was told to write, but I can't: $!\n"); - next; - } - if ($rv == length($outbuffer{$client}) || - $! == POSIX::EWOULDBLOCK()) - { - #print "Sent all, or EWOULDBLOCK\n"; - if ($qp{$client}->{__quitting}) { - freeclient($client); - next; - } - substr($outbuffer{$client}, 0, $rv, ''); - delete($outbuffer{$client}) unless length($outbuffer{$client}); - } - else { - print "Error: $!\n"; - # Couldn't write all the data, and it wasn't because - # it would have blocked. Shut down and move on. - freeclient($client); - next; - } - } - } -} - -sub freeclient { - my $client = shift; - #print "Freeing client: $client\n"; - delete $inbuffer{$client}; - delete $outbuffer{$client}; - delete $ready{$client}; - delete $qp{$client}; - $select->remove($client); - close($client); -} - -sub start_connection { - my $self = shift; - my $remote_ip = shift; - my $remote_host = shift; - - $self->log(LOGNOTICE, "Connection from $remote_host [$remote_ip]"); - my $remote_info = 'NOINFO'; - - # if the local dns resolver doesn't filter it out we might get - # ansi escape characters that could make a ps axw do "funny" - # things. So to be safe, cut them out. - $remote_host =~ tr/a-zA-Z\.\-0-9//cd; - - $self->SUPER::connection->start(remote_info => $remote_info, - remote_ip => $remote_ip, - remote_host => $remote_host, - @_); -} - -sub client { - my $self = shift; - @_ and $self->{_client} = shift; - $self->{_client}; -} - -sub nonblock { - my $socket = shift; - my $flags = fcntl($socket, F_GETFL, 0) - or die "Can't get flags for socket: $!"; - fcntl($socket, F_SETFL, $flags | O_NONBLOCK) - or die "Can't set flags for socket: $!"; -} - -sub read_input { - my $self = shift; - die "read_input is disabled in SelectServer"; -} - -sub respond { - my ($self, $code, @messages) = @_; - my $client = $self->client || die "No client!"; - while (my $msg = shift @messages) { - my $line = $code . (@messages?"-":" ").$msg; - $self->log(LOGINFO, ">$line"); - $outbuffer{$client} .= "$line\r\n"; - } - return 1; -} - -sub disconnect { - my $self = shift; - #print "Disconnecting\n"; - $self->{__quitting} = 1; - $self->SUPER::disconnect(@_); -} - -sub data { - my $self = shift; - $self->respond(503, "MAIL first"), return 1 unless $self->transaction->sender; - $self->respond(503, "RCPT first"), return 1 unless $self->transaction->recipients; - $self->respond(354, "go ahead"); - $indata{$self->client()} = 1; - $self->{__buffer} = ''; - $self->{__size} = 0; - $self->{__blocked} = ""; - $self->{__in_header} = 1; - $self->{__complete} = 0; - $self->{__max_size} = $self->config('databytes') || 0; -} - -sub data_line { - my $self = shift; - local $_ = shift; - - if ($_ eq ".\r\n") { - $self->log(LOGDEBUG, "max_size: $self->{__max_size} / size: $self->{__size}"); - delete $indata{$self->client()}; - - my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP"; - - if (!$self->transaction->header) { - $self->transaction->header(Mail::Header->new(Modify => 0, MailFrom => "COERCE")); - } - $self->transaction->header->add("Received", "from ".$self->connection->remote_info - ." (HELO ".$self->connection->hello_host . ") (".$self->connection->remote_ip - . ") by ".$self->config('me')." (qpsmtpd/".$self->version - .") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', localtime)), - 0); - - #$self->respond(550, $self->transaction->blocked),return 1 if ($self->transaction->blocked); - $self->respond(552, "Message too big!"),return 1 if $self->{__max_size} and $self->{__size} > $self->{__max_size}; - - my ($rc, $msg) = $self->run_hooks("data_post"); - if ($rc == DONE) { - return 1; - } - elsif ($rc == DENY) { - $self->respond(552, $msg || "Message denied"); - } - elsif ($rc == DENYSOFT) { - $self->respond(452, $msg || "Message denied temporarily"); - } - else { - $self->queue($self->transaction); - } - - # DATA is always the end of a "transaction" - return $self->reset_transaction; - } - elsif ($_ eq ".\n") { - $self->respond(451, "See http://develooper.com/code/qpsmtpd/barelf.html"); - $self->{__quitting} = 1; - return; - } - - # add a transaction->blocked check back here when we have line by line plugin access... - unless (($self->{__max_size} and $self->{__size} > $self->{__max_size})) { - s/\r\n$/\n/; - s/^\.\./\./; - if ($self->{__in_header} and m/^\s*$/) { - $self->{__in_header} = 0; - my @header = split /\n/, $self->{__buffer}; - - # ... need to check that we don't reformat any of the received lines. - # - # 3.8.2 Received Lines in Gatewaying - # When forwarding a message into or out of the Internet environment, a - # gateway MUST prepend a Received: line, but it MUST NOT alter in any - # way a Received: line that is already in the header. - - my $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE"); - $header->extract(\@header); - $self->transaction->header($header); - $self->{__buffer} = ""; - } - - if ($self->{__in_header}) { - $self->{__buffer} .= $_; - } - else { - $self->transaction->body_write($_); - } - $self->{__size} += length $_; - } -} - -1; diff --git a/plugins/check_earlytalker b/plugins/check_earlytalker index f21748b..df4eab4 100644 --- a/plugins/check_earlytalker +++ b/plugins/check_earlytalker @@ -44,38 +44,36 @@ and terminating the SMTP connection. =cut -use IO::Select; - -use warnings; -use strict; +my $MSG = 'Connecting host started transmitting before SMTP greeting'; sub register { my ($self, $qp, @args) = @_; if (@args % 2) { - $self->log(LOGERROR, "Unrecognized/mismatched arguments"); - return undef; + $self->log(LOGERROR, "Unrecognized/mismatched arguments"); + return undef; } $self->{_args} = { - 'wait' => 1, - 'action' => 'denysoft', - 'defer-reject' => 0, - @args, + 'wait' => 1, + 'action' => 'denysoft', + 'defer-reject' => 0, + @args, }; if ($qp->{conn} && $qp->{conn}->isa('Apache2::Connection')) { require APR::Const; APR::Const->import(qw(POLLIN SUCCESS)); - $self->register_hook('connect', 'apr_connect_handler'); + $self->register_hook('connect', 'hook_connect_apr'); } else { - $self->register_hook('connect', 'connect_handler'); + $self->register_hook('connect', 'hook_connect'); } - $self->register_hook('mail', 'mail_handler') + $self->register_hook('connect', 'hook_connect_post'); + $self->register_hook('mail', 'hook_mail') if $self->{_args}->{'defer-reject'}; 1; } -sub apr_connect_handler { +sub hook_connect_apr { my ($self, $transaction) = @_; return DECLINED if ($self->qp->connection->notes('whitelistclient')); @@ -92,47 +90,55 @@ sub apr_connect_handler { $self->qp->connection->notes('earlytalker', 1); } else { - my $msg = 'Connecting host started transmitting before SMTP greeting'; - return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; - return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; + return (DENY,$MSG) if $self->{_args}->{'action'} eq 'deny'; + return (DENYSOFT,$MSG) if $self->{_args}->{'action'} eq 'denysoft'; } } else { $self->log(LOGINFO, "remote host said nothing spontaneous, proceeding"); } + return DECLINED; } -sub connect_handler { +sub hook_connect { my ($self, $transaction) = @_; - my $in = new IO::Select; - my $ip = $self->qp->connection->remote_ip; - - return DECLINED - if ($self->qp->connection->notes('whitelistclient')); - - $in->add(\*STDIN) || return DECLINED; - if ($in->can_read($self->{_args}->{'wait'})) { - $self->log(LOGNOTICE, "remote host started talking before we said hello [$ip]"); - if ($self->{_args}->{'defer-reject'}) { - $self->qp->connection->notes('earlytalker', 1); - } else { - my $msg = 'Connecting host started transmitting before SMTP greeting'; - return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; - return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; - } - } else { - $self->log(LOGINFO, 'remote host said nothing spontaneous, proceeding'); - } - return DECLINED; + + my $qp = $self->qp; + my $conn = $qp->connection; + $qp->AddTimer($self->{_args}{'wait'}, sub { read_now($qp, $conn) }); + return CONTINUATION; } -sub mail_handler { - my ($self, $txn) = @_; - my $msg = 'Connecting host started transmitting before SMTP greeting'; +sub read_now { + my ($qp, $conn) = @_; + + if (my $data = $qp->read(1024)) { + if (length($$data)) { + $qp->log(LOGNOTICE, 'remote host started talking before we said hello'); + $qp->push_back_read($data); + $conn->notes('earlytalker', 1); + } + } + $qp->finish_continuation; +} - return DECLINED unless $self->qp->connection->notes('earlytalker'); - return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny'; - return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft'; +sub hook_connect_post { + my ($self, $transaction) = @_; + + my $conn = $self->qp->connection; + return DECLINED unless $conn->notes('earlytalker'); + return DECLINED if $self->{'defer-reject'}; + return (DENY,$MSG) if $self->{_args}->{'action'} eq 'deny'; + return (DENYSOFT,$MSG) if $self->{_args}->{'action'} eq 'denysoft'; + return DECLINED; # assume action eq 'log' +} + +sub hook_mail { + my ($self, $txn) = @_; + + return DECLINED unless $self->connection->notes('earlytalker'); + return (DENY,$MSG) if $self->{_args}->{'action'} eq 'deny'; + return (DENYSOFT,$MSG) if $self->{_args}->{'action'} eq 'denysoft'; return DECLINED; } diff --git a/plugins/dnsbl b/plugins/dnsbl index 7bed581..ca2c5d5 100644 --- a/plugins/dnsbl +++ b/plugins/dnsbl @@ -1,20 +1,17 @@ -#!perl -w +#!/usr/bin/perl -w + +use Danga::DNS; sub register { - my ($self, $qp, $denial ) = @_; - if ( defined $denial and $denial =~ /^disconnect$/i ) { - $self->{_dnsbl}->{DENY} = DENY_DISCONNECT; - } - else { - $self->{_dnsbl}->{DENY} = DENY; - } - + my ($self) = @_; + $self->register_hook("connect", "connect_handler"); + $self->register_hook("connect", "pickup_handler"); } -sub hook_connect { +sub connect_handler { my ($self, $transaction) = @_; - my $remote_ip = $self->qp->connection->remote_ip; + my $remote_ip = $self->connection->remote_ip; # perform RBLSMTPD checks to mimic Dan Bernstein's rblsmtpd if (defined($ENV{'RBLSMTPD'})) { @@ -29,163 +26,91 @@ sub hook_connect { $self->log(LOGDEBUG, "RBLSMTPD not set for $remote_ip"); } - my $allow = grep { s/\.?$/./; $_ eq substr($remote_ip . '.', 0, length $_) } $self->qp->config('dnsbl_allow'); + my $allow = grep { s/\.?$/./; $_ eq substr($remote_ip . '.', 0, length $_) } $self->config('dnsbl_allow'); return DECLINED if $allow; - my %dnsbl_zones = map { (split /:/, $_, 2)[0,1] } $self->qp->config('dnsbl_zones'); + my %dnsbl_zones = map { (split /:/, $_, 2)[0,1] } $self->config('dnsbl_zones'); return DECLINED unless %dnsbl_zones; my $reversed_ip = join(".", reverse(split(/\./, $remote_ip))); - # we should queue these lookups in the background and just fetch the - # results in the first rcpt handler ... oh well. - - my $res = new Net::DNS::Resolver; - $res->tcp_timeout(30); - $res->udp_timeout(30); - - my $sel = IO::Select->new(); - + $self->transaction->notes('pending_dns_queries', scalar(keys(%dnsbl_zones))); + my $qp = $self->qp; for my $dnsbl (keys %dnsbl_zones) { # fix to find A records, if the dnsbl_zones line has a second field 20/1/04 ++msp if (defined($dnsbl_zones{$dnsbl})) { $self->log(LOGDEBUG, "Checking $reversed_ip.$dnsbl for A record in the background"); - $sel->add($res->bgsend("$reversed_ip.$dnsbl")); + Danga::DNS->new( + callback => sub { process_a_result($qp, $dnsbl_zones{$dnsbl}, @_) }, + host => "$reversed_ip.$dnsbl", + type => 'A', + client => $self->qp->input_sock, + ); } else { $self->log(LOGDEBUG, "Checking $reversed_ip.$dnsbl for TXT record in the background"); - $sel->add($res->bgsend("$reversed_ip.$dnsbl", "TXT")); + Danga::DNS->new( + callback => sub { process_txt_result($qp, @_) }, + host => "$reversed_ip.$dnsbl", + type => 'TXT', + client => $self->qp->input_sock, + ); } } - $self->qp->connection->notes('dnsbl_sockets', $sel); - - return DECLINED; + return CONTINUATION; } -sub process_sockets { - my ($self) = @_; - - my $conn = $self->qp->connection; - - return $conn->notes('dnsbl') - if $conn->notes('dnsbl'); - - my %dnsbl_zones = map { (split /:/, $_, 2)[0,1] } $self->qp->config('dnsbl_zones'); - - my $res = new Net::DNS::Resolver; - $res->tcp_timeout(30); - $res->udp_timeout(30); - - my $sel = $conn->notes('dnsbl_sockets') or return ""; - my $remote_ip = $self->qp->connection->remote_ip; - - my $result; - - $self->log(LOGDEBUG, "waiting for dnsbl dns"); - - # don't wait more than 8 seconds here - my @ready = $sel->can_read(8); - - $self->log(LOGDEBUG, "DONE waiting for dnsbl dns, got " , scalar @ready, " answers ...") ; - return '' unless @ready; - - for my $socket (@ready) { - my $query = $res->bgread($socket); - $sel->remove($socket); - undef $socket; - - my $dnsbl; - - if ($query) { - my $a_record = 0; - foreach my $rr ($query->answer) { - $a_record = 1 if $rr->type eq "A"; - my $name = $rr->name; - ($dnsbl) = ($name =~ m/(?:\d+\.){4}(.*)/) unless $dnsbl; - $dnsbl = $name unless $dnsbl; - $self->log(LOGDEBUG, "name ", $rr->name); - next unless $rr->type eq "TXT"; - $self->log(LOGDEBUG, "got txt record"); - $result = $rr->txtdata and last; - } - #$a_record and $result = "Blocked by $dnsbl"; - - if ($a_record) { - if (defined $dnsbl_zones{$dnsbl}) { - $result = $dnsbl_zones{$dnsbl}; - #$result =~ s/%IP%/$ENV{'TCPREMOTEIP'}/g; - $result =~ s/%IP%/$remote_ip/g; - } else { - # shouldn't get here? - $result = "Blocked by $dnsbl"; - } - } +sub process_a_result { + my ($qp, $template, $result, $query) = @_; + + my $pending = $qp->transaction->notes('pending_dns_queries'); + $qp->transaction->notes('pending_dns_queries', --$pending); + + warn("Result for A $query: $result\n"); + if ($result !~ /^\d+\.\d+\.\d+\.\d+$/) { + # NXDOMAIN or ERROR possibly... + $qp->finish_continuation unless $pending; + return; } - else { - $self->log(LOGERROR, "$dnsbl query failed: ", $res->errorstring) - unless $res->errorstring eq "NXDOMAIN"; - } - - if ($result) { - #kill any other pending I/O - $conn->notes('dnsbl_sockets', undef); - $result = join("\n", $self->qp->config('dnsbl_rejectmsg'), $result); - return $conn->notes('dnsbl', $result); - } - } - - if ($sel->count) { - # loop around if we have dns blacklists left to see results from - return $self->process_sockets(); - } - - # er, the following code doesn't make much sense anymore... - - # if there was more to read; then forget it - $conn->notes('dnsbl_sockets', undef); - - return $conn->notes('dnsbl', $result); - + + my $conn = $qp->connection; + my $ip = $conn->remote_ip; + $template =~ s/%IP%/$ip/g; + $conn->notes('dnsbl', $template) unless $conn->notes('dnsbl'); + $qp->finish_continuation unless $pending; } -sub hook_rcpt { +sub process_txt_result { + my ($qp, $result, $query) = @_; + + my $pending = $qp->transaction->notes('pending_dns_queries'); + $qp->transaction->notes('pending_dns_queries', --$pending); + + warn("Result for TXT $query: $result\n"); + if ($result !~ /[a-z]/) { + # NXDOMAIN or ERROR probably... + $qp->finish_continuation unless $pending; + return; + } + + my $conn = $qp->connection; + $conn->notes('dnsbl', $result) unless $conn->notes('dnsbl'); + $qp->finish_continuation unless $pending; +} + +sub pickup_handler { my ($self, $transaction, $rcpt) = @_; - my $connection = $self->qp->connection; # RBLSMTPD being non-empty means it contains the failure message to return if (defined ($ENV{'RBLSMTPD'}) && $ENV{'RBLSMTPD'} ne '') { my $result = $ENV{'RBLSMTPD'}; - my $remote_ip = $connection->remote_ip; + my $remote_ip = $self->connection->remote_ip; $result =~ s/%IP%/$remote_ip/g; - return ($self->{_dnsbl}->{DENY}, - join(" ", $self->qp->config('dnsbl_rejectmsg'), $result)); + return (DENY, join(" ", $self->config('dnsbl_rejectmsg'), $result)); } - my $note = $self->process_sockets; - my $whitelist = $connection->notes('whitelisthost'); - if ( $note ) { - if ( $rcpt->user =~ /^(?:postmaster|abuse|mailer-daemon|root)$/i ) { - $self->log(LOGWARN, "Don't blacklist special account: ".$rcpt->user); - } - elsif ( $whitelist ) { - $self->log(LOGWARN, "Whitelist overrode blacklist: $whitelist"); - } - elsif ( $connection->relay_client() ) { - $self->log(LOGWARN, "Don't blacklist relay/auth clients"); - } - else { - return ($self->{_dnsbl}->{DENY}, $note); - } - } - return DECLINED; - -} - -sub hook_disconnect { - my ($self, $transaction) = @_; - - $self->qp->connection->notes('dnsbl_sockets', undef); - + my $note = $self->connection->notes('dnsbl'); + return (DENY, $note) if $note; return DECLINED; } @@ -200,19 +125,6 @@ dnsbl - handle DNS BlackList lookups Plugin that checks the IP address of the incoming connection against a configurable set of RBL services. -=head1 Usage - -Add the following line to the config/plugins file: - - dnsbl [disconnect] - -If you want to immediately drop the connection (since some blacklisted -servers attempt multiple sends per session), add the optional keyword -"disconnect" (case insensitive) to the config line. In most cases, an -IP address that is listed should not be given the opportunity to begin -a new transaction, since even the most volatile blacklists will return -the same answer for a short period of time (the minimum DNS cache period). - =head1 Configuration files This plugin uses the following configuration files. All of these are optional. diff --git a/plugins/queue/qmail-queue b/plugins/queue/qmail-queue index 6bc4a9d..9d592e6 100644 --- a/plugins/queue/qmail-queue +++ b/plugins/queue/qmail-queue @@ -39,12 +39,12 @@ sub hook_queue { my ($self, $transaction) = @_; # these bits inspired by Peter Samuels "qmail-queue wrapper" - pipe(MESSAGE_READER, MESSAGE_WRITER) or fault("Could not create message pipe"), exit; - pipe(ENVELOPE_READER, ENVELOPE_WRITER) or fault("Could not create envelope pipe"), exit; + pipe(MESSAGE_READER, MESSAGE_WRITER) or die("Could not create message pipe"); + pipe(ENVELOPE_READER, ENVELOPE_WRITER) or die("Could not create envelope pipe"); my $child = fork(); - not defined $child and fault(451, "Could not fork"), exit; + not defined $child and die("Could not fork"); if ($child) { # Parent @@ -52,9 +52,13 @@ sub hook_queue { select(ENVELOPE_WRITER); $| = 1; select($oldfh); - close MESSAGE_READER or fault("close msg reader fault"),exit; - close ENVELOPE_READER or fault("close envelope reader fault"), exit; + close MESSAGE_READER or die("close msg reader fault"); + close ENVELOPE_READER or die("close envelope reader fault"); + # Note - technically there's a race here because if the exec() below + # fails and the writes to MESSAGE_WRITER block we get a deadlocked process. + # This check to see if(eof(PIPE)) will catch "most" of these problems. + die "Message pipe has been closed" if eof(MESSAGE_WRITER); $transaction->header->print(\*MESSAGE_WRITER); $transaction->body_resetpos; while (my $line = $transaction->body_getline) { @@ -64,6 +68,7 @@ sub hook_queue { my @rcpt = map { "T" . $_->address } $transaction->recipients; my $from = "F".($transaction->sender->address|| "" ); + die "Envelope pipe has been closed" if eof(ENVELOPE_WRITER); print ENVELOPE_WRITER "$from\0", join("\0",@rcpt), "\0\0" or return(DECLINED,"Could not print addresses to queue"); @@ -104,6 +109,10 @@ sub hook_queue { my $rc = exec $queue_exec; + # close the pipe + close(MESSAGE_READER); + close(MESSAGE_WRITER); + exit 6; # we'll only get here if the exec fails } } diff --git a/plugins/require_resolvable_fromhost b/plugins/require_resolvable_fromhost index 1ce0f17..a587bb5 100644 --- a/plugins/require_resolvable_fromhost +++ b/plugins/require_resolvable_fromhost @@ -1,46 +1,81 @@ -use Net::DNS qw(mx); +#!/usr/bin/perl -sub hook_mail { - my ($self, $transaction, $sender) = @_; +use Danga::DNS; - return DECLINED - if ($self->qp->connection->notes('whitelistclient')); - - $sender->format ne "<>" - and $self->qp->config("require_resolvable_fromhost") - and !$self->check_dns($sender->host) - and return (DENYSOFT, - ($sender->host - ? "Could not resolve ". $sender->host - : "FQDN required in the envelope sender")); - - return DECLINED; +sub register { + my ($self) = @_; + $self->register_hook("mail", "mail_handler"); + $self->register_hook("rcpt", "rcpt_handler"); +} +sub mail_handler { + my ($self, $transaction, $sender) = @_; + + $self->transaction->notes('resolvable', 1); + return DECLINED if $sender->format eq "<>"; + return $self->check_dns($sender->host); } sub check_dns { - my ($self, $host) = @_; - - # for stuff where we can't even parse a hostname out of the address - return 0 unless $host; - - return 1 if $host =~ m/^\[(\d{1,3}\.){3}\d{1,3}\]$/; - - my $res = new Net::DNS::Resolver; - $res->tcp_timeout(30); - $res->udp_timeout(30); - return 1 if mx($res, $host); - my $query = $res->search($host); - if ($query) { - foreach my $rr ($query->answer) { - return 1 if $rr->type eq "A" or $rr->type eq "MX"; + my ($self, $host) = @_; + + # for stuff where we can't even parse a hostname out of the address + return DECLINED unless $host; + + if( $host =~ m/^\[(\d{1,3}\.){3}\d{1,3}\]$/ ) { + $self->transaction->notes('resolvable', 1); + return DECLINED; } - } - else { - $self->log(LOGWARN, "$$ query for $host failed: ", $res->errorstring) - unless $res->errorstring eq "NXDOMAIN"; - } - return 0; + + $self->transaction->notes('pending_dns_queries', 2); + my $qp = $self->qp; + $self->log(LOGDEBUG, "Checking $host for MX record in the background"); + Danga::DNS->new( + callback => sub { dns_result($qp, @_) }, + host => $host, + type => "MX", + client => $qp->input_sock, + ); + $self->log(LOGDEBUG, "Checking $host for A record in the background"); + Danga::DNS->new( + callback => sub { dns_result($qp, @_) }, + host => $host, + client => $qp->input_sock, + ); + return CONTINUATION; } + +sub dns_result { + my ($qp, $result, $query) = @_; + + my $pending = $qp->transaction->notes('pending_dns_queries'); + $qp->transaction->notes('pending_dns_queries', --$pending); + + if ($result =~ /^[A-Z]+$/) { + # probably an error + $qp->log(LOGDEBUG, "DNS error: $result looking up $query"); + } else { + $qp->transaction->notes('resolvable', 1); + $qp->log(LOGDEBUG, "DNS lookup $query returned: $result"); + } + + $qp->finish_continuation unless $pending; +} + + +sub rcpt_handler { + my ($self, $transaction) = @_; + + if (!$transaction->notes('resolvable')) { + my $sender = $transaction->sender; + $self->log(LOGDEBUG, "Could not resolve " .$sender->host) if $sender->host; + return (DENYSOFT, + ($sender->host + ? "Could not resolve ". $sender->host + : "FQDN required in the envelope sender")); + } + + return DECLINED; +} diff --git a/plugins/rhsbl b/plugins/rhsbl index 4003630..96e1dec 100644 --- a/plugins/rhsbl +++ b/plugins/rhsbl @@ -1,31 +1,39 @@ +#!/usr/bin/perl -sub hook_mail { +use Danga::DNS; + +sub register { + my ($self) = @_; + + $self->register_hook('mail', 'mail_handler'); + $self->register_hook('rcpt', 'rcpt_handler'); +} + +sub mail_handler { my ($self, $transaction, $sender) = @_; - my $res = new Net::DNS::Resolver; - my $sel = IO::Select->new(); my %rhsbl_zones_map = (); # Perform any RHS lookups in the background. We just send the query packets here # and pick up any results in the RCPT handler. # MTAs gets confused when you reject mail during MAIL FROM: - my %rhsbl_zones = map { (split /\s+/, $_, 2)[0,1] } $self->qp->config('rhsbl_zones'); + my %rhsbl_zones = map { (split /\s+/, $_, 2)[0,1] } $self->config('rhsbl_zones'); if ($sender->format ne '<>' and %rhsbl_zones) { + my $helo = $self->connection->hello_host; push(my @hosts, $sender->host); - #my $helo = $self->qp->connection->hello_host; - #push(@hosts, $helo) if $helo && $helo ne $sender->host; + push(@hosts, $helo) if $helo && $helo ne $sender->host; for my $host (@hosts) { - for my $rhsbl (keys %rhsbl_zones) { + for my $rhsbl (keys %rhsbl_zones) { $self->log(LOGDEBUG, "Checking $host.$rhsbl for A record in the background"); - $sel->add($res->bgsend("$host.$rhsbl")); - $rhsbl_zones_map{"$host.$rhsbl"} = $rhsbl_zones{$rhsbl}; + Danga::DNS->new( + callback => sub { $self->process_result($host, $rhsbl_zones{$rhsbl}, @_) }, + host => "$host.$rhsbl", + client => $self->qp->input_sock, + ); + } } - } - - %{$self->{_rhsbl_zones_map}} = %rhsbl_zones_map; - $transaction->notes('rhsbl_sockets', $sel); } else { $self->log(LOGDEBUG, 'no RHS checks necessary'); } @@ -33,80 +41,28 @@ sub hook_mail { return DECLINED; } -sub hook_rcpt { - my ($self, $transaction, $rcpt) = @_; - my $host = $transaction->sender->host; - my $hello = $self->qp->connection->hello_host; - - my $result = $self->process_sockets; - if ($result && defined($self->{_rhsbl_zones_map}{$result})) { - if ($result =~ /^$host\./ ) { - return (DENY, "Mail from $host rejected because it " . $self->{_rhsbl_zones_map}{$result}); - } else { - return (DENY, "Mail from HELO $hello rejected because it " . $self->{_rhsbl_zones_map}{$result}); +sub process_result { + my ($self, $host, $template, $result, $query) = @_; + + if ($result !~ /^\d+\.\d+\.\d+\.\d+$/) { + # NXDOMAIN or error + return; } - } + + my $tran = $self->transaction; + return if $tran->notes('rhsbl'); + if ($host eq $tran->sender->host) { + $tran->notes('rhsbl', "Mail from $host rejected because it $template"); + } + else { + $tran->notes('rhsbl', "Mail from HELO $host rejected because it $template"); + } +} + +sub rcpt_handler { + my ($self, $transaction, $rcpt) = @_; + + my $result = $transaction->notes('rhsbl'); return (DENY, $result) if $result; return DECLINED; } - -sub process_sockets { - my ($self) = @_; - my $trans = $self->transaction; - my $result = ''; - - return $trans->notes('rhsbl') if $trans->notes('rhsbl'); - - my $res = new Net::DNS::Resolver; - my $sel = $trans->notes('rhsbl_sockets') or return ''; - - $self->log(LOGDEBUG, 'waiting for rhsbl dns'); - - # don't wait more than 8 seconds here - my @ready = $sel->can_read(8); - - $self->log(LOGDEBUG, 'DONE waiting for rhsbl dns, got ' , scalar @ready, ' answers ...') ; - return '' unless @ready; - - for my $socket (@ready) { - my $query = $res->bgread($socket); - $sel->remove($socket); - undef $socket; - - if ($query) { - foreach my $rr ($query->answer) { - $self->log(LOGDEBUG, 'got an ' . $rr->type . ' record ' . $rr->name); - if ($rr->type eq 'A') { - $result = $rr->name; - $self->log(LOGDEBUG, "A record found for $result with IP " . $rr->address); - last; - } - } - } else { - $self->log(LOGCRIT, "query failed: ", $res->errorstring) unless $res->errorstring eq 'NXDOMAIN'; - } - - if ($result) { - #kill any other pending I/O - $trans->notes('rhsbl_sockets', undef); - return $trans->notes('rhsbl', $result); - } - } - - if ($sel->count) { - # loop around if we have dns results left - return $self->process_sockets(); - } - - # if there was more to read; then forget it - $trans->notes('rhsbl_sockets', undef); - - return $trans->notes('rhsbl', $result); -} - -sub hook_disconnect { - my ($self, $transaction) = @_; - - $transaction->notes('rhsbl_sockets', undef); - return DECLINED; -} diff --git a/plugins/stats b/plugins/stats new file mode 100644 index 0000000..fbe0119 --- /dev/null +++ b/plugins/stats @@ -0,0 +1,74 @@ +#!/usr/bin/perl -w + +use Time::HiRes qw(time); + +my $START_TIME = time; +our $MAILS_RECEIVED = 0; +our $MAILS_REJECTED = 0; +our $MAILS_TEMPFAIL = 0; + +sub register { + my ($self) = @_; + + $self->register_hook('deny', 'increment_deny'); + $self->register_hook('queue', 'increment_mails'); +} + +sub get_stats { + my $class = shift; + my $uptime = $class->uptime; + my $recvd = $class->mails_received; + my $reject = $class->mails_rejected; + my $soft = $class->mails_tempfailed; + my $rate = $class->mails_per_sec; + return sprintf(" Uptime: %0.2f sec\n". + " Mails Received: % 10d\n". + " 5xx: % 10d\n". + " 4xx: % 10d\n". + "Mails per second: %0.2f\n", + $uptime, $recvd, $reject, $soft, $rate); +} + +sub increment_deny { + my ($self, $tran, $plugin, $level) = @_; + + if ($level == DENY or $level == DENY_DISCONNECT) { + $MAILS_REJECTED++; + } + elsif ($level == DENYSOFT or $level == DENYSOFT_DISCONNECT) { + $MAILS_TEMPFAIL++; + } + + return DECLINED; +} + +sub increment_mails { + my $self = shift; + + $MAILS_RECEIVED++; + + return DECLINED; +} + +sub uptime { + return (time() - $START_TIME); +} + +sub mails_received { + return $MAILS_RECEIVED; +} + +sub mails_rejected { + return $MAILS_REJECTED; +} + +sub mails_tempfailed { + return $MAILS_TEMPFAIL; +} + +sub mails_per_sec { + my $class = shift; + return ($MAILS_RECEIVED / $class->uptime()); +} + + diff --git a/plugins/tls b/plugins/tls index 8406f76..7379350 100644 --- a/plugins/tls +++ b/plugins/tls @@ -21,7 +21,7 @@ MAIL FROM onwards. =cut -use IO::Socket::SSL qw(debug1 debug2 debug3 debug4); +use IO::Socket::SSL; # qw(debug1 debug2 debug3 debug4); sub init { my ($self, $qp, $cert, $key) = @_; @@ -38,7 +38,6 @@ sub init { SSL_cipher_list => 'HIGH', SSL_server => 1 ) or die "Could not create SSL context: $!"; - # now extract the password... $self->ssl_context($ssl_ctx); } @@ -66,31 +65,44 @@ sub hook_unrecognized_command { $self->qp->respond (220, "Go ahead with TLS"); eval { - my $tlssocket = IO::Socket::SSL->new_from_fd( - fileno(STDIN), '+>', - SSL_use_cert => 1, - SSL_cert_file => $self->tls_cert, - SSL_key_file => $self->tls_key, - SSL_cipher_list => 'HIGH', - SSL_server => 1, - SSL_reuse_ctx => $self->ssl_context, - ) or die "Could not create SSL socket: $!"; - + my $tlssocket; + if ($self->qp->isa('Danga::Socket')) { + # high_perf + $tlssocket = IO::Socket::SSL->start_SSL($self->qp->sock, + SSL_use_cert => 1, + SSL_cert_file => $self->tls_cert, + SSL_key_file => $self->tls_key, + SSL_cipher_list => 'HIGH', + SSL_server => 1, + SSL_reuse_ctx => $self->ssl_context, + ) or die "Could not convert SSL socket: $!"; + } + else { + $tlssocket = IO::Socket::SSL->new_from_fd( + fileno(STDIN), '+>', + SSL_use_cert => 1, + SSL_cert_file => $self->tls_cert, + SSL_key_file => $self->tls_key, + SSL_cipher_list => 'HIGH', + SSL_server => 1, + SSL_reuse_ctx => $self->ssl_context, + ) or die "Could not create SSL socket: $!"; + } + my $conn = $self->connection; # Create a new connection object with subset of information collected thus far - $self->qp->connection(Qpsmtpd::Connection->new( - map { $_ => $conn->$_ } - qw( - local_ip - local_port - remote_ip - remote_port - remote_host - remote_info - ), - )); + my $newconn = Qpsmtpd::Connection->new(); + for (qw(local_ip local_port remote_ip remote_port remote_host remote_info)) { + $newconn->$_($conn->$_()); + } + $self->qp->connection($newconn); $self->qp->reset_transaction; - *STDIN = *STDOUT = $self->connection->notes('tls_socket', $tlssocket); + if ($self->qp->isa('Danga::Socket')) { + $self->connection->notes('tls_socket', $tlssocket); + } + else { + *STDIN = *STDOUT = $self->connection->notes('tls_socket', $tlssocket); + } $self->connection->notes('tls_enabled', 1); }; if ($@) { @@ -131,5 +143,6 @@ sub ssl_context { sub bad_ssl_hook { my ($self, $transaction) = @_; return DENY, "Command refused due to lack of security" if $transaction->notes('ssl_failed'); + return DECLINED; } *hook_helo = *hook_data = *hook_rcpt = *hook_mail = *hook_auth = \&bad_ssl_hook; diff --git a/qpsmtpd b/qpsmtpd index 254458e..f416f7a 100755 --- a/qpsmtpd +++ b/qpsmtpd @@ -1,30 +1,455 @@ -#!/usr/bin/perl -Tw -# Copyright (c) 2001 Ask Bjoern Hansen. See the LICENSE file for details. -# The "command dispatch" system is taken from colobus - http://trainedmonkey.com/colobus/ -# -# this is designed to be run under tcpserver (http://cr.yp.to/ucspi-tcp.html) -# or inetd if you're into that sort of thing -# -# -# For more information see http://develooper.com/code/qpsmtpd/ -# -# +#!/usr/bin/perl + +use lib "./lib"; +BEGIN { + delete $ENV{ENV}; + delete $ENV{BASH_ENV}; + $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin'; +} -use lib 'lib'; -use Qpsmtpd::TcpServer; use strict; -$| = 1; +use vars qw($DEBUG); +use FindBin qw(); +# TODO: need to make this taint friendly +use lib "$FindBin::Bin/lib"; +use Danga::Socket; +use Danga::Client; +use Qpsmtpd::PollServer; +use Qpsmtpd::ConfigServer; +use Qpsmtpd::Constants; +use IO::Socket; +use Carp; +use POSIX qw(WNOHANG); +use Getopt::Long; -delete $ENV{ENV}; -$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin'; +$|++; -my $qpsmtpd = Qpsmtpd::TcpServer->new(); -$qpsmtpd->start_connection(); -$qpsmtpd->run(); +# For debugging +# $SIG{USR1} = sub { Carp::confess("USR1") }; -__END__ +use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET); +$SIG{'PIPE'} = "IGNORE"; # handled manually +$DEBUG = 0; +my $CONFIG_PORT = 20025; +my $CONFIG_LOCALADDR = '127.0.0.1'; -1; +my $PORT = 2525; +my $LOCALADDR = '0.0.0.0'; +my $LineMode = 0; +my $PROCS = 1; +my $MAXCONN = 15; # max simultaneous connections +my $USER = 'smtpd'; # user to suid to +my $MAXCONNIP = 5; # max simultaneous connections from one IP +my $PAUSED = 0; +my $NUMACCEPT = 20; + +sub help { + print < \$PORT, + 'l|listen-address=s' => \$LOCALADDR, + 'j|procs=i' => \$PROCS, + 'd|debug+' => \$DEBUG, + 'f|forkmode' => \$LineMode, + 'c|limit-connections=i' => \$MAXCONN, + 'm|max-from-ip=i' => \$MAXCONNIP, + 'u|user=s' => \$USER, + 'a|accept=i' => \$NUMACCEPT, + 'h|help' => \&help, + 'use-poll' => \&force_poll, +) || help(); + +# detaint the commandline +if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help } +if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help } +if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help } +if ($MAXCONN =~ /^(\d+)$/) { $MAXCONN = $1 } else { &help } +if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help } +if ($NUMACCEPT =~ /^(\d+)$/) { $NUMACCEPT = $1 } else { &help } +my $_NUMACCEPT = $NUMACCEPT; +$::LineMode = $LineMode; +$PROCS = 1 if $LineMode; +# This is a bit of a hack, but we get to approximate MAXCONN stuff when we +# have multiple children listening on the same socket. +$MAXCONN /= $PROCS; +$MAXCONNIP /= $PROCS; + +sub force_poll { + $Danga::Socket::HaveEpoll = 0; + $Danga::Socket::HaveKQueue = 0; +} + +Danga::Socket::init_poller(); + +my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : + $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()"); + +my $SERVER; +my $CONFIG_SERVER; + +# Code for inetd/tcpserver mode +if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) { + run_as_inetd(); + exit(0); +} + +my %childstatus = (); + +run_as_server(); +exit(0); + +sub _fork { + my $pid = fork; + if (!defined($pid)) { die "Cannot fork: $!" } + return $pid if $pid; + + # Fixup Net::DNS randomness after fork + srand($$ ^ time); + + local $^W; + delete $INC{'Net/DNS/Header.pm'}; + require Net::DNS::Header; + + # cope with different versions of Net::DNS + eval { + $Net::DNS::Resolver::global{id} = 1; + $Net::DNS::Resolver::global{id} = int(rand(Net::DNS::Resolver::MAX_ID())); + # print "Next DNS ID: $Net::DNS::Resolver::global{id}\n"; + }; + if ($@) { + # print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n"; + } + + # Fixup lost kqueue after fork + $Danga::Socket::HaveKQueue = undef; + Danga::Socket::init_poller(); +} + +sub spawn_child { + _fork and return; + + $SIG{CHLD} = "DEFAULT"; + + Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler); + Qpsmtpd::PollServer->EventLoop(); + exit; +} + +sub sig_chld { + $SIG{CHLD} = 'IGNORE'; + while ( (my $child = waitpid(-1,WNOHANG)) > 0) { + last unless $child > 0; + print "child $child died\n"; + delete $childstatus{$child}; + } + return if $LineMode; + # restart a new child if in poll server mode + spawn_child(); + $SIG{CHLD} = \&sig_chld; +} + +sub HUNTSMAN { + $SIG{CHLD} = 'DEFAULT'; + kill 'INT' => keys %childstatus; + exit(0); +} + +sub run_as_inetd { + $LineMode = $::LineMode = 1; + + my $insock = IO::Handle->new_from_fd(0, "r"); + IO::Handle::blocking($insock, 0); + + my $outsock = IO::Handle->new_from_fd(1, "w"); + IO::Handle::blocking($outsock, 0); + + my $client = Danga::Client->new($insock); + + my $out = Qpsmtpd::PollServer->new($outsock); + $out->load_plugins; + $out->init_logger; + $out->input_sock($client); + $client->push_back_read("Connect\n"); + # Cause poll/kevent/epoll to end quickly in first iteration + Qpsmtpd::PollServer->AddTimer(1, sub { }); + + while (1) { + $client->enable_read; + my $line = $client->get_line; + last if !defined($line); + my $output = $out->process_line($line); + $out->write($output) if $output; + } +} + +sub run_as_server { + local $::MAXconn = $MAXCONN; + # establish SERVER socket, bind and listen. + $SERVER = IO::Socket::INET->new(LocalPort => $PORT, + LocalAddr => $LOCALADDR, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => SOMAXCONN ) + or die "Error creating server $LOCALADDR:$PORT : $@\n"; + + IO::Handle::blocking($SERVER, 0); + binmode($SERVER, ':raw'); + + $CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT, + LocalAddr => $CONFIG_LOCALADDR, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => 1 ) + or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n"; + + IO::Handle::blocking($CONFIG_SERVER, 0); + binmode($CONFIG_SERVER, ':raw'); + + # Drop priviledges + my (undef, undef, $quid, $qgid) = getpwnam $USER or + die "unable to determine uid/gid for $USER\n"; + $) = ""; + POSIX::setgid($qgid) or + die "unable to change gid: $!\n"; + POSIX::setuid($quid) or + die "unable to change uid: $!\n"; + $> = $quid; + + ::log(LOGINFO, 'Running as user '. + (getpwuid($>) || $>) . + ', group '. + (getgrgid($)) || $))); + + # Load plugins here + my $plugin_loader = Qpsmtpd::SMTP->new(); + $plugin_loader->load_plugins; + + if ($PROCS > 1) { + $SIG{'CHLD'} = \&sig_chld; + my @kids; + for (1..$PROCS) { + push @kids, spawn_child(); + } + $SIG{INT} = $SIG{TERM} = sub { $SIG{CHLD} = "IGNORE"; kill 2 => @kids; exit }; + $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children $POLL"); + sleep while (1); + } + else { + if ($LineMode) { + $SIG{INT} = $SIG{TERM} = \&HUNTSMAN; + } + $plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process $POLL" . + ($LineMode ? " (forking server)" : "")); + Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler, + fileno($CONFIG_SERVER) => \&config_handler, + ); + while (1) { + Qpsmtpd::PollServer->EventLoop(); + } + exit; + } + +} + +sub config_handler { + my $csock = $CONFIG_SERVER->accept(); + if (!$csock) { + # warn("accept failed on config server: $!"); + return; + } + binmode($csock, ':raw'); + + printf("Config server connection\n") if $DEBUG; + + IO::Handle::blocking($csock, 0); + setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + my $client = Qpsmtpd::ConfigServer->new($csock); + $client->watch_read(1); + return; +} + +# Accept all new connections +sub accept_handler { + my $running; + if( $LineMode ) { + $running = scalar keys %childstatus; + } + else { + my $descriptors = Danga::Client->DescriptorMap; + $running = scalar keys %$descriptors; + } + + for (1 .. $NUMACCEPT) { + if ($running >= $MAXCONN) { + ::log(LOGINFO,"Too many connections: $running >= $MAXCONN."); + return; + } + $running++; + if (! _accept_handler($running)) { + # got here because we have too many accepts. + $NUMACCEPT = $_NUMACCEPT; + return; + } + } + + # got here because we have accept's left. + # So double the number we accept next time. + $NUMACCEPT *= 2; +} + +use Errno qw(EAGAIN EWOULDBLOCK); + +sub _accept_handler { + my $running = shift; + + my $csock = $SERVER->accept(); + if (!$csock) { + # warn("accept() failed: $!"); + return; + if ($! == EAGAIN || $! == EWOULDBLOCK) { + return; + } + else { + warn("accept() failed: $!"); + return 1; + } + } + binmode($csock, ':raw'); + + printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock)) + if $DEBUG; + + IO::Handle::blocking($csock, 0); + setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + if (!$LineMode) { + # multiplex mode + my $client = Qpsmtpd::PollServer->new($csock); + my $rem_ip = $client->peer_ip_string; + + if ($PAUSED) { + $client->write("451 Sorry, this server is currently paused\r\n"); + $client->close; + return 1; + } + + if ($MAXCONNIP) { + my $num_conn = 1; # seed with current value + + # If we for-loop directly over values %childstatus, a SIGCHLD + # can call REAPER and slip $rip out from under us. Causes + # "Use of freed value in iteration" under perl 5.8.4. + my $descriptors = Danga::Client->DescriptorMap; + my @obj = values %$descriptors; + foreach my $obj (@obj) { + local $^W; + # This is a bit of a slow way to do this. Wish I could cache the method call. + ++$num_conn if ($obj->peer_ip_string eq $rem_ip); + } + + if ($num_conn > $MAXCONNIP) { + $client->log(LOGINFO,"Too many connections from $rem_ip: " + ."$num_conn > $MAXCONNIP. Denying connection."); + $client->write("451 Sorry, too many connections from $rem_ip, try again later\r\n"); + $client->close; + return 1; + } + $client->log(LOGINFO, "accepted connection $running/$MAXCONN ($num_conn/$MAXCONNIP) from $rem_ip"); + } + + $client->push_back_read("Connect\n"); + $client->watch_read(1); + return 1; + } + + # fork-per-connection mode + my $rem_ip = $csock->sockhost(); + + if ($MAXCONNIP) { + my $num_conn = 1; # seed with current value + + my @rip = values %childstatus; + foreach my $rip (@rip) { + ++$num_conn if (defined $rip && $rip eq $rem_ip); + } + + if ($num_conn > $MAXCONNIP) { + ::log(LOGINFO,"Too many connections from $rem_ip: " + ."$num_conn > $MAXCONNIP. Denying connection."); + print $csock "451 Sorry, too many connections from $rem_ip, try again later\r\n"; + close $csock; + return 1; + } + } + + if (my $pid = _fork) { + $childstatus{$pid} = $rem_ip; + return $csock->close(); + } + + $SERVER->close(); # make sure the child doesn't accept() new connections + + $SIG{$_} = 'DEFAULT' for keys %SIG; + + my $client = Qpsmtpd::PollServer->new($csock); + $client->push_back_read("Connect\n"); + # Cause poll/kevent/epoll to end quickly in first iteration + Qpsmtpd::PollServer->AddTimer(0.1, sub { }); + + while (1) { + $client->enable_read; + my $line = $client->get_line; + last if !defined($line); + my $resp = $client->process_line($line); + $client->write($resp) if $resp; + } + + $client->log(LOGDEBUG, "Finished with child %d.\n", fileno($csock)) + if $DEBUG; + $client->close(); + + exit; +} + +######################################################################## + +sub log { + my ($level,$message) = @_; + # $level not used yet. this is reimplemented from elsewhere anyway + warn("$$ fd:? $message\n"); +} + +sub pause { + my ($pause) = @_; + $PAUSED = $pause; +} diff --git a/qpsmtpd-forkserver b/qpsmtpd-forkserver index 9bb89be..9a04930 100755 --- a/qpsmtpd-forkserver +++ b/qpsmtpd-forkserver @@ -11,6 +11,7 @@ use Qpsmtpd::TcpServer; use Qpsmtpd::Constants; use IO::Socket; use IO::Select; +use Qpsmtpd::PollServer; use Socket; use Getopt::Long; use POSIX qw(:sys_wait_h :errno_h :signal_h); @@ -24,6 +25,7 @@ my @LOCALADDR; # ip address(es) to bind to my $USER = 'smtpd'; # user to suid to my $MAXCONNIP = 5; # max simultaneous connections from one IP my $PID_FILE = ''; # file to which server PID will be written +our $DEBUG = 0; sub usage { print <<"EOT"; @@ -47,6 +49,7 @@ GetOptions('h|help' => \&usage, 'p|port=i' => \$PORT, 'u|user=s' => \$USER, 'pid-file=s' => \$PID_FILE, + 'd|debug+' => \$DEBUG, ) || &usage; # detaint the commandline @@ -68,6 +71,12 @@ $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin'; my %childstatus = (); sub REAPER { +# foreach my $chld (keys %childstatus) { +# if (defined(waitpid($chld, WNOHANG))) { +# ::log(LOGINFO,"cleaning up after $chld"); +# delete $childstatus{$chld}; +# } +# } while ( defined(my $chld = waitpid(-1, WNOHANG)) ){ last unless $chld > 0; ::log(LOGINFO,"cleaning up after $chld"); @@ -212,30 +221,22 @@ while (1) { ::log(LOGINFO, "Connection Timed Out"); exit; }; - my $localsockaddr = getsockname($client); - my ($lport, $laddr) = sockaddr_in($localsockaddr); - $ENV{TCPLOCALIP} = inet_ntoa($laddr); - # my ($port, $iaddr) = sockaddr_in($hisaddr); - $ENV{TCPREMOTEIP} = inet_ntoa($iaddr); - $ENV{TCPREMOTEHOST} = gethostbyaddr($iaddr, AF_INET) || "Unknown"; - - # don't do this! - #$0 = "qpsmtpd-forkserver: $ENV{TCPREMOTEIP} / $ENV{TCPREMOTEHOST}"; - - ::log(LOGINFO, "Accepted connection $running/$MAXCONN from $ENV{TCPREMOTEIP} / $ENV{TCPREMOTEHOST}"); + ::log(LOGINFO, "Accepted connection $running/$MAXCONN"); - # dup to STDIN/STDOUT - POSIX::dup2(fileno($client), 0); - POSIX::dup2(fileno($client), 1); + $::LineMode = 1; - $qpsmtpd->start_connection - ( - local_ip => $ENV{TCPLOCALIP}, - local_port => $lport, - remote_ip => $ENV{TCPREMOTEIP}, - remote_port => $port, - ); - $qpsmtpd->run(); + my $qp = Qpsmtpd::PollServer->new($client); + $qp->load_plugins; + $qp->init_logger; + $qp->push_back_read("Connect\n"); + Qpsmtpd::PollServer->AddTimer(0.1, sub { }); + while (1) { + $qp->enable_read; + my $line = $qp->get_line; + last if !defined($line); + my $output = $qp->process_line($line); + $qp->write($output) if $output; + } exit; # child leaves } diff --git a/qpsmtpd-server b/qpsmtpd-server deleted file mode 100755 index 248c472..0000000 --- a/qpsmtpd-server +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/perl -Tw -# Copyright (c) 2001 Ask Bjoern Hansen. See the LICENSE file for details. -# The "command dispatch" system is taken from colobus - http://trainedmonkey.com/colobus/ -# -# this is designed to be run under tcpserver (http://cr.yp.to/ucspi-tcp.html) -# or inetd if you're into that sort of thing -# -# -# For more information see http://develooper.com/code/qpsmtpd/ -# -# - -use lib 'lib'; -use Qpsmtpd::SelectServer; -use strict; -$| = 1; - -delete $ENV{ENV}; -$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin'; - -Qpsmtpd::SelectServer->main(); - -__END__ - - - - -1;