High perf branch merge and fixes


git-svn-id: https://svn.perl.org/qpsmtpd/trunk@496 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
Matt Sergeant 2005-07-11 19:10:49 +00:00
commit 9683016276
17 changed files with 1715 additions and 886 deletions

View File

@ -167,9 +167,15 @@ sub _config_from_file {
return wantarray ? @config : $config[0];
}
our $HOOKS;
sub load_plugins {
my $self = shift;
if ($HOOKS) {
return $self->{hooks} = $HOOKS;
}
$self->log(LOGWARN, "Plugins already loaded") if $self->{hooks};
$self->{hooks} = {};
@ -180,6 +186,8 @@ sub load_plugins {
@plugins = $self->_load_plugins($dir, @plugins);
$HOOKS = $self->{hooks};
return @plugins;
}
@ -252,23 +260,76 @@ sub transaction {
sub run_hooks {
my ($self, $hook) = (shift, shift);
if ($self->{_continuation} && $hook ne "logging") {
die "Continuations in progress from previous hook (this is the $hook hook)";
}
my $hooks = $self->{hooks};
if ($hooks->{$hook}) {
my @r;
for my $code (@{$hooks->{$hook}}) {
my @local_hooks = @{$hooks->{$hook}};
while (@local_hooks) {
my $code = shift @local_hooks;
@r = $self->run_hook($hook, $code, @_);
next unless @r;
if ($r[0] == CONTINUATION) {
$self->disable_read() if $self->isa('Danga::Client');
$self->{_continuation} = [$hook, [@_], @local_hooks];
}
last unless $r[0] == DECLINED;
}
$r[0] = DECLINED if not defined $r[0];
return @r;
}
return (0, '');
}
sub finish_continuation {
my ($self) = @_;
die "No continuation in progress" unless $self->{_continuation};
$self->enable_read() if $self->isa('Danga::Client');
my $todo = $self->{_continuation};
$self->{_continuation} = undef;
my $hook = shift @$todo || die "No hook in the continuation";
my $args = shift @$todo || die "No hook args in the continuation";
my @r;
while (@$todo) {
my $code = shift @$todo;
@r = $self->run_hook($hook, $code, @$args);
if ($r[0] == CONTINUATION) {
$self->disable_read() if $self->isa('Danga::Client');
$self->{_continuation} = [$hook, $args, @$todo];
return @r;
}
last unless $r[0] == DECLINED;
}
$r[0] = DECLINED if not defined $r[0];
my $responder = $hook . "_respond";
if (my $meth = $self->can($responder)) {
warn("continuation finished on $self\n");
return $meth->($self, $r[0], $r[1], @$args);
}
die "No ${hook}_respond method";
}
sub run_hook {
my ($self, $hook, $code, @args) = @_;
my @r;
if ( $hook eq 'logging' ) { # without calling $self->log()
eval { (@r) = $code->{code}->($self, $self->transaction, @_); };
$@ and warn("FATAL LOGGING PLUGIN ERROR: ", $@) and next;
}
else {
$self->varlog(LOGINFO, $hook, $code->{name});
eval { (@r) = $code->{code}->($self, $self->transaction, @_); };
$@ and $self->log(LOGCRIT, "FATAL PLUGIN ERROR: ", $@) and next;
print STDERR "plugin $hook $code->{name} 1\n";
eval { (@r) = $code->{code}->($self, $self->transaction, @args); };
print STDERR "plugin $hook $code->{name} 2\n";
$@ and $self->log(LOGCRIT, "FATAL PLUGIN ERROR: ", $@) and return;
!defined $r[0]
and $self->log(LOGERROR, "plugin ".$code->{name}
." running the $hook hook returned undef!")
and next;
."running the $hook hook returned undef!")
and return;
if ($self->transaction) {
my $tnotes = $self->transaction->notes( $code->{name} );
@ -285,24 +346,12 @@ sub run_hooks {
$r[0] == DENY_DISCONNECT or $r[0] == DENYSOFT_DISCONNECT)
{
$r[1] = "" if not defined $r[1];
$self->log(LOGDEBUG, "Plugin ".$code->{name}.
", hook $hook returned ".return_code($r[0]).", $r[1]");
$self->log(LOGDEBUG, "Plugin $code->{name}, hook $hook returned $r[0], $r[1]");
$self->run_hooks("deny", $code->{name}, $r[0], $r[1]) unless ($hook eq "deny");
} else {
$r[1] = "" if not defined $r[1];
$self->log(LOGDEBUG, "Plugin ".$code->{name}.
", hook $hook returned ".return_code($r[0]).", $r[1]");
$self->run_hooks("ok", $code->{name}, $r[0], $r[1]) unless ($hook eq "ok");
}
}
last unless $r[0] == DECLINED;
}
$r[0] = DECLINED if not defined $r[0];
return @r;
}
return (0, '');
}
sub _register_hook {

285
lib/Qpsmtpd/ConfigServer.pm Normal file
View File

@ -0,0 +1,285 @@
# $Id$
package Qpsmtpd::ConfigServer;
use base ('Danga::Client');
use Qpsmtpd::Constants;
use strict;
use fields qw(
_auth
_commands
_config_cache
_connection
_transaction
_test_mode
_extras
other_fds
);
my $PROMPT = "Enter command: ";
sub new {
my Qpsmtpd::ConfigServer $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ );
$self->write($PROMPT);
return $self;
}
sub max_idle_time { 3600 } # one hour
sub process_line {
my $self = shift;
my $line = shift || return;
if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; }
local $SIG{ALRM} = sub {
my ($pkg, $file, $line) = caller();
die "ALARM: $pkg, $file, $line";
};
my $prev = alarm(2); # must process a command in < 2 seconds
my $resp = eval { $self->_process_line($line) };
alarm($prev);
if ($@) {
print STDERR "Error: $@\n";
}
return $resp || '';
}
sub respond {
my $self = shift;
my (@messages) = @_;
while (my $msg = shift @messages) {
$self->write("$msg\r\n");
}
return;
}
sub fault {
my $self = shift;
my ($msg) = shift || "program fault - command not performed";
print STDERR "$0 [$$]: $msg ($!)\n";
$self->respond("Error - " . $msg);
return $PROMPT;
}
sub _process_line {
my $self = shift;
my $line = shift;
$line =~ s/\r?\n//;
my ($cmd, @params) = split(/ +/, $line);
my $meth = "cmd_" . lc($cmd);
if (my $lookup = $self->can($meth)) {
my $resp = eval {
$lookup->($self, @params);
};
if ($@) {
my $error = $@;
chomp($error);
Qpsmtpd->log(LOGERROR, "Command Error: $error");
return $self->fault("command '$cmd' failed unexpectedly");
}
return "$resp\n$PROMPT";
}
else {
# No such method - i.e. unrecognized command
return $self->fault("command '$cmd' unrecognised");
}
}
my %helptext = (
help => "HELP [CMD] - Get help on all commands or a specific command",
status => "STATUS - Returns status information about current connections",
list => "LIST [LIMIT] - List the connections, specify limit or negative limit to shrink list",
kill => "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF",
pause => "PAUSE - Stop accepting new connections",
continue => "CONTINUE - Resume accepting connections",
reload => "RELOAD - Reload all plugins and config",
quit => "QUIT - Exit the config server",
);
sub cmd_help {
my $self = shift;
my ($subcmd) = @_;
$subcmd ||= 'help';
$subcmd = lc($subcmd);
if ($subcmd eq 'help') {
my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext));
return "Available Commands:\n\n$txt\n";
}
my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list.";
return "$txt\n";
}
sub cmd_quit {
my $self = shift;
$self->close;
}
sub cmd_pause {
my $self = shift;
my $other_fds = $self->OtherFds;
$self->{other_fds} = { %$other_fds };
%$other_fds = ();
return "PAUSED";
}
sub cmd_continue {
my $self = shift;
my $other_fds = $self->{other_fds};
$self->OtherFds( %$other_fds );
%$other_fds = ();
return "UNPAUSED";
}
sub cmd_status {
my $self = shift;
# Status should show:
# - Total time running
# - Total number of mails received
# - Total number of mails rejected (5xx)
# - Total number of mails tempfailed (5xx)
# - Avg number of mails/minute
# - Number of current connections
# - Number of outstanding DNS queries
my $output = "Current Status as of " . gmtime() . " GMT\n\n";
if (defined &Qpsmtpd::Plugin::stats::register) {
# Stats plugin is loaded
$output .= Qpsmtpd::Plugin::stats->get_stats;
}
my $descriptors = Danga::Socket->DescriptorMap;
my $current_connections = 0;
my $current_dns = 0;
foreach my $fd (keys %$descriptors) {
my $pob = $descriptors->{$fd};
if ($pob->isa("Qpsmtpd::PollServer")) {
$current_connections++;
}
elsif ($pob->isa("Danga::DNS::Resolver")) {
$current_dns = $pob->pending;
}
}
$output .= "Curr Connections: $current_connections / $::MAXconn\n".
"Curr DNS Queries: $current_dns";
return $output;
}
sub cmd_list {
my $self = shift;
my ($count) = @_;
my $descriptors = Danga::Socket->DescriptorMap;
my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n";
my @all;
foreach my $fd (keys %$descriptors) {
my $pob = $descriptors->{$fd};
if ($pob->isa("Qpsmtpd::PollServer")) {
next unless $pob->connection->remote_ip; # haven't even started yet
push @all, [$pob+0, $pob->connection->remote_ip,
$pob->connection->remote_host, $pob->uptime];
}
}
@all = sort { $a->[3] <=> $b->[3] } @all;
if ($count) {
if ($count > 0) {
@all = @all[$#all-($count-1) .. $#all];
}
else {
@all = @all[0..(abs($count) - 1)];
}
}
foreach my $item (@all) {
$list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item);
}
return $list;
}
sub cmd_kill {
my $self = shift;
my ($match) = @_;
return "SYNTAX: KILL (\$IP | \$REF)\n" unless $match;
my $descriptors = Danga::Socket->DescriptorMap;
my $killed = 0;
my $is_ip = (index($match, '.') >= 0);
foreach my $fd (keys %$descriptors) {
my $pob = $descriptors->{$fd};
if ($pob->isa("Qpsmtpd::PollServer")) {
if ($is_ip) {
next unless $pob->connection->remote_ip; # haven't even started yet
if ($pob->connection->remote_ip eq $match) {
$pob->write("550 Your connection has been killed by an administrator\r\n");
$pob->disconnect;
$killed++;
}
}
else {
# match by ID
if ($pob+0 == hex($match)) {
$pob->write("550 Your connection has been killed by an administrator\r\n");
$pob->disconnect;
$killed++;
}
}
}
}
return "Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n";
}
sub cmd_dump {
my $self = shift;
my ($ref) = @_;
return "SYNTAX: DUMP \$REF\n" unless $ref;
require Data::Dumper;
$Data::Dumper::Indent=1;
my $descriptors = Danga::Socket->DescriptorMap;
foreach my $fd (keys %$descriptors) {
my $pob = $descriptors->{$fd};
if ($pob->isa("Qpsmtpd::PollServer")) {
if ($pob+0 == hex($ref)) {
return Data::Dumper::Dumper($pob);
}
}
}
return "Unable to find the connection: $ref. Try the LIST command\n";
}
1;
__END__
=head1 NAME
Qpsmtpd::ConfigServer - a configuration server for qpsmtpd
=head1 DESCRIPTION
When qpsmtpd runs in multiplex mode it also provides a config server that you
can connect to. This allows you to view current connection statistics and other
gumph that you probably don't care about.
=cut

View File

@ -25,6 +25,7 @@ my %return_codes = (
DENYSOFT_DISCONNECT => 904, # 450 + disconnect
DECLINED => 909,
DONE => 910,
CONTINUATION => 911,
);
use vars qw(@ISA @EXPORT);

View File

@ -37,11 +37,15 @@ sub _register {
my $self = shift;
my $qp = shift;
local $self->{_qp} = $qp;
$self->init($qp, @_) if $self->can('init');
$self->init($qp, @_);
$self->_register_standard_hooks($qp, @_);
$self->register($qp, @_) if $self->can('register');
$self->register($qp, @_);
}
# Designed to be overloaded
sub init {}
sub register {}
sub qp {
shift->{_qp};
}
@ -61,6 +65,10 @@ sub connection {
shift->qp->connection;
}
sub config {
shift->qp->config(@_);
}
sub spool_dir {
shift->qp->spool_dir;
}

339
lib/Qpsmtpd/PollServer.pm Normal file
View File

@ -0,0 +1,339 @@
# $Id: Server.pm,v 1.10 2005/02/14 22:04:48 msergeant Exp $
package Qpsmtpd::PollServer;
use base ('Danga::Client', 'Qpsmtpd::SMTP');
# use fields required to be a subclass of Danga::Client. Have to include
# all fields used by Qpsmtpd.pm here too.
use fields qw(
input_sock
mode
header_lines
in_header
data_size
max_size
hooks
start_time
_auth
_commands
_config_cache
_connection
_transaction
_test_mode
_extras
_continuation
);
use Qpsmtpd::Constants;
use Qpsmtpd::Auth;
use Qpsmtpd::Address;
use Danga::DNS;
use Mail::Header;
use POSIX qw(strftime);
use Socket qw(inet_aton AF_INET CRLF);
use Time::HiRes qw(time);
use strict;
sub max_idle_time { 60 }
sub max_connect_time { 1200 }
sub input_sock {
my $self = shift;
@_ and $self->{input_sock} = shift;
$self->{input_sock} || $self;
}
sub new {
my Qpsmtpd::PollServer $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ );
$self->{start_time} = time;
$self->{mode} = 'connect';
$self->load_plugins;
return $self;
}
sub uptime {
my Qpsmtpd::PollServer $self = shift;
return (time() - $self->{start_time});
}
sub reset_for_next_message {
my $self = shift;
$self->SUPER::reset_for_next_message(@_);
$self->{_commands} = {
ehlo => 1,
helo => 1,
rset => 1,
mail => 1,
rcpt => 1,
data => 1,
help => 1,
vrfy => 1,
noop => 1,
quit => 1,
auth => 0, # disabled by default
};
$self->{mode} = 'cmd';
$self->{_extras} = {};
}
sub respond {
my $self = shift;
my ($code, @messages) = @_;
while (my $msg = shift @messages) {
my $line = $code . (@messages ? "-" : " ") . $msg;
$self->write("$line\r\n");
}
return 1;
}
sub fault {
my $self = shift;
$self->SUPER::fault(@_);
return;
}
sub log {
my ($self, $trace, @log) = @_;
my $fd = $self->{fd};
$fd ||= '?';
$self->SUPER::log($trace, "fd:$fd", @log);
}
sub process_line {
my $self = shift;
my $line = shift || return;
if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; }
local $SIG{ALRM} = sub {
my ($pkg, $file, $line) = caller();
die "ALARM: ($self->{mode}) $pkg, $file, $line";
};
my $prev = alarm(2); # must process a command in < 2 seconds
eval { $self->_process_line($line) };
alarm($prev);
if ($@) {
print STDERR "Error: $@\n";
return $self->fault("command failed unexpectedly") if $self->{mode} eq 'cmd';
return $self->fault("error processing data lines") if $self->{mode} eq 'data';
return $self->fault("unknown error");
}
return;
}
sub _process_line {
my $self = shift;
my $line = shift;
if ($self->{mode} eq 'connect') {
$self->{mode} = 'cmd';
my $rc = $self->start_conversation;
return;
}
elsif ($self->{mode} eq 'cmd') {
$line =~ s/\r?\n//;
return $self->process_cmd($line);
}
elsif ($self->{mode} eq 'data') {
return $self->data_line($line);
}
else {
die "Unknown mode";
}
}
sub process_cmd {
my $self = shift;
my $line = shift;
my ($cmd, @params) = split(/ +/, $line);
my $meth = lc($cmd);
if (my $lookup = $self->{_commands}->{$meth} && $self->can($meth)) {
my $resp = eval {
$lookup->($self, @params);
};
if ($@) {
my $error = $@;
chomp($error);
$self->log(LOGERROR, "Command Error: $error");
return $self->fault("command '$cmd' failed unexpectedly");
}
return $resp;
}
else {
# No such method - i.e. unrecognized command
my ($rc, $msg) = $self->run_hooks("unrecognized_command", $meth, @params);
return $self->unrecognized_command_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
}
sub disconnect {
my $self = shift;
$self->SUPER::disconnect(@_);
$self->close;
}
sub start_conversation {
my $self = shift;
my $conn = $self->connection;
# set remote_host, remote_ip and remote_port
my ($ip, $port) = split(':', $self->peer_addr_string);
$conn->remote_ip($ip);
$conn->remote_port($port);
$conn->remote_info("[$ip]");
Danga::DNS->new(
client => $self,
# NB: Setting remote_info to the same as remote_host
callback => sub { $conn->remote_info($conn->remote_host($_[0])) },
host => $ip,
);
my ($rc, $msg) = $self->run_hooks("connect");
return $self->connect_respond($rc, $msg) unless $rc == CONTINUATION;
return DONE;
}
sub data {
my $self = shift;
my ($rc, $msg) = $self->run_hooks("data");
return $self->data_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
sub data_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DONE) {
return;
}
elsif ($rc == DENY) {
$self->respond(554, $msg || "Message denied");
$self->reset_transaction();
return;
}
elsif ($rc == DENYSOFT) {
$self->respond(451, $msg || "Message denied temporarily");
$self->reset_transaction();
return;
}
elsif ($rc == DENY_DISCONNECT) {
$self->respond(554, $msg || "Message denied");
$self->disconnect;
return;
}
elsif ($rc == DENYSOFT_DISCONNECT) {
$self->respond(451, $msg || "Message denied temporarily");
$self->disconnect;
return;
}
return $self->respond(503, "MAIL first") unless $self->transaction->sender;
return $self->respond(503, "RCPT first") unless $self->transaction->recipients;
$self->{mode} = 'data';
$self->{header_lines} = [];
$self->{data_size} = 0;
$self->{in_header} = 1;
$self->{max_size} = ($self->config('databytes'))[0] || 0; # this should work in scalar context
$self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");
return $self->respond(354, "go ahead");
}
sub data_line {
my $self = shift;
my $line = shift;
if ($line eq ".\r\n") {
# add received etc.
$self->{mode} = 'cmd';
return $self->end_of_data;
}
# Reject messages that have either bare LF or CR. rjkaes noticed a
# lot of spam that is malformed in the header.
if ($line eq ".\n" or $line eq ".\r") {
$self->respond(421, "See http://smtpd.develooper.com/barelf.html");
$self->disconnect;
return;
}
# add a transaction->blocked check back here when we have line by line plugin access...
unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) {
$line =~ s/\r\n$/\n/;
$line =~ s/^\.\./\./;
if ($self->{in_header} and $line =~ m/^\s*$/) {
# end of headers
$self->{in_header} = 0;
# ... need to check that we don't reformat any of the received lines.
#
# 3.8.2 Received Lines in Gatewaying
# When forwarding a message into or out of the Internet environment, a
# gateway MUST prepend a Received: line, but it MUST NOT alter in any
# way a Received: line that is already in the header.
my $header = Mail::Header->new($self->{header_lines},
Modify => 0, MailFrom => "COERCE");
$self->transaction->header($header);
#$header->add("X-SMTPD", "qpsmtpd/".$self->version.", http://smtpd.develooper.com/");
# FIXME - call plugins to work on just the header here; can
# save us buffering the mail content.
}
if ($self->{in_header}) {
push @{ $self->{header_lines} }, $line;
}
else {
$self->transaction->body_write($line);
}
$self->{data_size} += length $line;
}
return;
}
sub end_of_data {
my $self = shift;
#$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300);
$self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");
my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP";
my $header = $self->transaction->header;
if (!$header) {
$header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
$self->transaction->header($header);
}
# only true if client authenticated
if ( defined $self->{_auth} and $self->{_auth} == OK ) {
$header->add("X-Qpsmtpd-Auth","True");
}
$header->add("Received", "from ".$self->connection->remote_info
." (HELO ".$self->connection->hello_host . ") (".$self->connection->remote_ip
. ")\n by ".$self->config('me')." (qpsmtpd/".$self->version
.") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', localtime)),
0);
return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size};
my ($rc, $msg) = $self->run_hooks("data_post");
return $self->data_post_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
1;

View File

@ -51,21 +51,9 @@ sub dispatch {
$self->{_counter}++;
if ($cmd !~ /^(\w{1,12})$/ or !exists $self->{_commands}->{$1}) {
my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd);
if ($rc == DENY_DISCONNECT) {
$self->respond(521, $msg);
$self->disconnect;
}
elsif ($rc == DENY) {
$self->respond(500, $msg);
}
elsif ($rc == DONE) {
1;
}
else {
$self->respond(500, "Unrecognized command");
}
return 1
my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd, @_);
return $self->unrecognized_command_respond($rc, $msg, @_) unless $rc == CONTINUATION;
return 1;
}
$cmd = $1;
@ -79,6 +67,20 @@ sub dispatch {
return;
}
sub unrecognized_command_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DENY_DISCONNECT) {
$self->respond(521, $msg);
$self->disconnect;
}
elsif ($rc == DENY) {
$self->respond(500, $msg);
}
elsif ($rc != DONE) {
$self->respond(500, "Unrecognized command");
}
}
sub fault {
my $self = shift;
my ($msg) = shift || "program fault - command not performed";
@ -92,12 +94,20 @@ sub start_conversation {
# this should maybe be called something else than "connect", see
# lib/Qpsmtpd/TcpServer.pm for more confusion.
my ($rc, $msg) = $self->run_hooks("connect");
return $self->connect_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
sub connect_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DENY) {
$self->respond(550, ($msg || 'Connection from you denied, bye bye.'));
$self->disconnect;
return $rc;
}
elsif ($rc == DENYSOFT) {
$self->respond(450, ($msg || 'Connection from you temporarily denied, bye bye.'));
$self->disconnect;
return $rc;
}
elsif ($rc == DONE) {
@ -124,6 +134,7 @@ sub reset_transaction {
sub connection {
my $self = shift;
@_ and $self->{_connection} = shift;
return $self->{_connection} || ($self->{_connection} = Qpsmtpd::Connection->new());
}
@ -136,11 +147,16 @@ sub helo {
return $self->respond (503, "but you already said HELO ...") if $conn->hello;
my ($rc, $msg) = $self->run_hooks("helo", $hello_host, @stuff);
if ($rc == DONE) {
# do nothing
} elsif ($rc == DENY) {
return $self->helo_respond($rc, $msg, $hello_host, @stuff) unless $rc == CONTINUATION;
return 1;
}
sub helo_respond {
my ($self, $rc, $msg, $hello_host) = @_;
if ($rc == DENY) {
$self->respond(550, $msg);
} elsif ($rc == DENYSOFT) {
}
elsif ($rc == DENYSOFT) {
$self->respond(450, $msg);
} elsif ($rc == DENY_DISCONNECT) {
$self->respond(550, $msg);
@ -148,11 +164,14 @@ sub helo {
} elsif ($rc == DENYSOFT_DISCONNECT) {
$self->respond(450, $msg);
$self->disconnect;
} else {
}
elsif ($rc != DONE) {
my $conn = $self->connection;
$conn->hello("helo");
$conn->hello_host($hello_host);
$self->transaction;
$self->respond(250, $self->config('me') ." Hi " . $conn->remote_info . " [" . $conn->remote_ip ."]; I am so happy to meet you.");
$self->respond(250, $self->config('me') ." Hi " . $conn->remote_info .
" [" . $conn->remote_ip ."]; I am so happy to meet you.");
}
}
@ -164,11 +183,16 @@ sub ehlo {
return $self->respond (503, "but you already said HELO ...") if $conn->hello;
my ($rc, $msg) = $self->run_hooks("ehlo", $hello_host, @stuff);
if ($rc == DONE) {
# do nothing
} elsif ($rc == DENY) {
return $self->ehlo_respond($rc, $msg, $hello_host, @stuff) unless $rc == CONTINUATION;
return 1;
}
sub ehlo_respond {
my ($self, $rc, $msg, $hello_host) = @_;
if ($rc == DENY) {
$self->respond(550, $msg);
} elsif ($rc == DENYSOFT) {
}
elsif ($rc == DENYSOFT) {
$self->respond(450, $msg);
} elsif ($rc == DENY_DISCONNECT) {
$self->respond(550, $msg);
@ -176,7 +200,9 @@ sub ehlo {
} elsif ($rc == DENYSOFT_DISCONNECT) {
$self->respond(450, $msg);
$self->disconnect;
} else {
}
elsif ($rc != DONE) {
my $conn = $self->connection;
$conn->hello("ehlo");
$conn->hello_host($hello_host);
$self->transaction;
@ -241,7 +267,7 @@ sub mail {
unless ($self->connection->hello) {
return $self->respond(503, "please say hello first ...");
}
else {
my $from_parameter = join " ", @_;
$self->log(LOGINFO, "full from_parameter: $from_parameter");
@ -262,6 +288,12 @@ sub mail {
return $self->respond(501, "could not parse your mail from command") unless $from;
my ($rc, $msg) = $self->run_hooks("mail", $from);
return $self->mail_respond($rc, $msg, $from) unless $rc == CONTINUATION;
return 1;
}
sub mail_respond {
my ($self, $rc, $msg, $from) = @_;
if ($rc == DONE) {
return 1;
}
@ -284,7 +316,7 @@ sub mail {
elsif ($rc == DENYSOFT_DISCONNECT) {
$msg ||= $from->format . ', temporarily denied';
$self->log(LOGINFO, "denysoft mail from " . $from->format . " ($msg)");
$self->respond(421, $msg);
$self->respond(450, $msg);
$self->disconnect;
}
else { # includes OK
@ -292,7 +324,6 @@ sub mail {
$self->respond(250, $from->format . ", sender OK - how exciting to get mail from you!");
$self->transaction->sender($from);
}
}
}
sub rcpt {
@ -308,6 +339,12 @@ sub rcpt {
return $self->respond(501, "could not parse recipient") unless $rcpt;
my ($rc, $msg) = $self->run_hooks("rcpt", $rcpt);
return $self->rcpt_respond($rc, $msg, $rcpt) unless $rc == CONTINUATION;
return 1;
}
sub rcpt_respond {
my ($self, $rc, $msg, $rcpt) = @_;
if ($rc == DONE) {
return 1;
}
@ -342,7 +379,6 @@ sub rcpt {
}
sub help {
my $self = shift;
$self->respond(214,
@ -364,6 +400,12 @@ sub vrfy {
# I also don't think it provides all the proper result codes.
my ($rc, $msg) = $self->run_hooks("vrfy");
return $self->vrfy_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
sub vrfy_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DONE) {
return 1;
}
@ -391,6 +433,12 @@ sub rset {
sub quit {
my $self = shift;
my ($rc, $msg) = $self->run_hooks("quit");
return $self->quit_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
sub quit_respond {
my ($self, $rc, $msg) = @_;
if ($rc != DONE) {
$self->respond(221, $self->config('me') . " closing connection. Have a wonderful day.");
}
@ -403,9 +451,17 @@ sub disconnect {
$self->reset_transaction;
}
sub disconnect_respond { }
sub data {
my $self = shift;
my ($rc, $msg) = $self->run_hooks("data");
return $self->data_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
sub data_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DONE) {
return 1;
}
@ -523,6 +579,11 @@ sub data {
$self->respond(552, "Message too big!"),return 1 if $max_size and $size > $max_size;
($rc, $msg) = $self->run_hooks("data_post");
return $self->data_post_respond($rc, $msg) unless $rc == CONTINUATION;
}
sub data_post_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DONE) {
return 1;
}
@ -538,7 +599,6 @@ sub data {
# DATA is always the end of a "transaction"
return $self->reset_transaction;
}
sub getline {
@ -554,6 +614,12 @@ sub queue {
my ($self, $transaction) = @_;
my ($rc, $msg) = $self->run_hooks("queue");
return $self->queue_respond($rc, $msg) unless $rc == CONTINUATION;
return 1;
}
sub queue_respond {
my ($self, $rc, $msg) = @_;
if ($rc == DONE) {
return 1;
}
@ -569,8 +635,6 @@ sub queue {
else {
$self->respond(451, $msg || "Queuing declined or disabled; try again later" );
}
}

View File

@ -1,320 +0,0 @@
package Qpsmtpd::SelectServer;
use Qpsmtpd::SMTP;
use Qpsmtpd::Constants;
use IO::Socket;
use IO::Select;
use POSIX qw(strftime);
use Socket qw(CRLF);
use Fcntl;
use Tie::RefHash;
use Net::DNS;
@ISA = qw(Qpsmtpd::SMTP);
use strict;
our %inbuffer = ();
our %outbuffer = ();
our %ready = ();
our %lookup = ();
our %qp = ();
our %indata = ();
tie %ready, 'Tie::RefHash';
my $server;
my $select;
our $QUIT = 0;
$SIG{INT} = $SIG{TERM} = sub { $QUIT++ };
sub log {
my ($self, $trace, @log) = @_;
my $level = Qpsmtpd::TRACE_LEVEL();
$level = $self->init_logger unless defined $level;
warn join(" ", fileno($self->client), @log), "\n"
if $trace <= $level;
}
sub main {
my $class = shift;
my %opts = (LocalPort => 25, Reuse => 1, Listen => SOMAXCONN, @_);
$server = IO::Socket::INET->new(%opts) or die "Server: $@";
print "Listening on $opts{LocalPort}\n";
nonblock($server);
$select = IO::Select->new($server);
my $res = Net::DNS::Resolver->new;
# TODO - make this more graceful - let all current SMTP sessions finish
# before quitting!
while (!$QUIT) {
foreach my $client ($select->can_read(1)) {
#print "Reading $client\n";
if ($client == $server) {
my $client_addr;
$client = $server->accept();
next unless $client;
my $ip = $client->peerhost;
my $bgsock = $res->bgsend($ip);
$select->add($bgsock);
$lookup{$bgsock} = $client;
}
elsif (my $qpclient = $lookup{$client}) {
my $packet = $res->bgread($client);
my $ip = $qpclient->peerhost;
my $hostname = $ip;
if ($packet) {
foreach my $rr ($packet->answer) {
if ($rr->type eq 'PTR') {
$hostname = $rr->rdatastr;
}
}
}
# $packet->print;
$select->remove($client);
delete($lookup{$client});
my $qp = Qpsmtpd::SelectServer->new();
$qp->client($qpclient);
$qp{$qpclient} = $qp;
$qp->log(LOGINFO, "Connection number " . keys(%qp));
$inbuffer{$qpclient} = '';
$outbuffer{$qpclient} = '';
$ready{$qpclient} = [];
$qp->start_connection($ip, $hostname);
$qp->load_plugins;
my $rc = $qp->start_conversation;
if ($rc != DONE) {
close($client);
next;
}
$select->add($qpclient);
nonblock($qpclient);
}
else {
my $data = '';
my $rv = $client->recv($data, POSIX::BUFSIZ(), 0);
unless (defined($rv) && length($data)) {
freeclient($client)
unless ($! == POSIX::EWOULDBLOCK() ||
$! == POSIX::EINPROGRESS() ||
$! == POSIX::EINTR());
next;
}
$inbuffer{$client} .= $data;
while ($inbuffer{$client} =~ s/^([^\r\n]*)\r?\n//) {
#print "<$1\n";
push @{$ready{$client}}, $1;
}
}
}
#print "Processing...\n";
foreach my $client (keys %ready) {
my $qp = $qp{$client};
#print "Processing $client = $qp\n";
foreach my $req (@{$ready{$client}}) {
if ($indata{$client}) {
$qp->data_line($req . CRLF);
}
else {
$qp->log(LOGINFO, "dispatching $req");
defined $qp->dispatch(split / +/, $req)
or $qp->respond(502, "command unrecognized: '$req'");
}
}
delete $ready{$client};
}
#print "Writing...\n";
foreach my $client ($select->can_write(1)) {
next unless $outbuffer{$client};
#print "Writing to $client\n";
my $rv = $client->send($outbuffer{$client}, 0);
unless (defined($rv)) {
warn("I was told to write, but I can't: $!\n");
next;
}
if ($rv == length($outbuffer{$client}) ||
$! == POSIX::EWOULDBLOCK())
{
#print "Sent all, or EWOULDBLOCK\n";
if ($qp{$client}->{__quitting}) {
freeclient($client);
next;
}
substr($outbuffer{$client}, 0, $rv, '');
delete($outbuffer{$client}) unless length($outbuffer{$client});
}
else {
print "Error: $!\n";
# Couldn't write all the data, and it wasn't because
# it would have blocked. Shut down and move on.
freeclient($client);
next;
}
}
}
}
sub freeclient {
my $client = shift;
#print "Freeing client: $client\n";
delete $inbuffer{$client};
delete $outbuffer{$client};
delete $ready{$client};
delete $qp{$client};
$select->remove($client);
close($client);
}
sub start_connection {
my $self = shift;
my $remote_ip = shift;
my $remote_host = shift;
$self->log(LOGNOTICE, "Connection from $remote_host [$remote_ip]");
my $remote_info = 'NOINFO';
# if the local dns resolver doesn't filter it out we might get
# ansi escape characters that could make a ps axw do "funny"
# things. So to be safe, cut them out.
$remote_host =~ tr/a-zA-Z\.\-0-9//cd;
$self->SUPER::connection->start(remote_info => $remote_info,
remote_ip => $remote_ip,
remote_host => $remote_host,
@_);
}
sub client {
my $self = shift;
@_ and $self->{_client} = shift;
$self->{_client};
}
sub nonblock {
my $socket = shift;
my $flags = fcntl($socket, F_GETFL, 0)
or die "Can't get flags for socket: $!";
fcntl($socket, F_SETFL, $flags | O_NONBLOCK)
or die "Can't set flags for socket: $!";
}
sub read_input {
my $self = shift;
die "read_input is disabled in SelectServer";
}
sub respond {
my ($self, $code, @messages) = @_;
my $client = $self->client || die "No client!";
while (my $msg = shift @messages) {
my $line = $code . (@messages?"-":" ").$msg;
$self->log(LOGINFO, ">$line");
$outbuffer{$client} .= "$line\r\n";
}
return 1;
}
sub disconnect {
my $self = shift;
#print "Disconnecting\n";
$self->{__quitting} = 1;
$self->SUPER::disconnect(@_);
}
sub data {
my $self = shift;
$self->respond(503, "MAIL first"), return 1 unless $self->transaction->sender;
$self->respond(503, "RCPT first"), return 1 unless $self->transaction->recipients;
$self->respond(354, "go ahead");
$indata{$self->client()} = 1;
$self->{__buffer} = '';
$self->{__size} = 0;
$self->{__blocked} = "";
$self->{__in_header} = 1;
$self->{__complete} = 0;
$self->{__max_size} = $self->config('databytes') || 0;
}
sub data_line {
my $self = shift;
local $_ = shift;
if ($_ eq ".\r\n") {
$self->log(LOGDEBUG, "max_size: $self->{__max_size} / size: $self->{__size}");
delete $indata{$self->client()};
my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP";
if (!$self->transaction->header) {
$self->transaction->header(Mail::Header->new(Modify => 0, MailFrom => "COERCE"));
}
$self->transaction->header->add("Received", "from ".$self->connection->remote_info
." (HELO ".$self->connection->hello_host . ") (".$self->connection->remote_ip
. ") by ".$self->config('me')." (qpsmtpd/".$self->version
.") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', localtime)),
0);
#$self->respond(550, $self->transaction->blocked),return 1 if ($self->transaction->blocked);
$self->respond(552, "Message too big!"),return 1 if $self->{__max_size} and $self->{__size} > $self->{__max_size};
my ($rc, $msg) = $self->run_hooks("data_post");
if ($rc == DONE) {
return 1;
}
elsif ($rc == DENY) {
$self->respond(552, $msg || "Message denied");
}
elsif ($rc == DENYSOFT) {
$self->respond(452, $msg || "Message denied temporarily");
}
else {
$self->queue($self->transaction);
}
# DATA is always the end of a "transaction"
return $self->reset_transaction;
}
elsif ($_ eq ".\n") {
$self->respond(451, "See http://develooper.com/code/qpsmtpd/barelf.html");
$self->{__quitting} = 1;
return;
}
# add a transaction->blocked check back here when we have line by line plugin access...
unless (($self->{__max_size} and $self->{__size} > $self->{__max_size})) {
s/\r\n$/\n/;
s/^\.\./\./;
if ($self->{__in_header} and m/^\s*$/) {
$self->{__in_header} = 0;
my @header = split /\n/, $self->{__buffer};
# ... need to check that we don't reformat any of the received lines.
#
# 3.8.2 Received Lines in Gatewaying
# When forwarding a message into or out of the Internet environment, a
# gateway MUST prepend a Received: line, but it MUST NOT alter in any
# way a Received: line that is already in the header.
my $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
$header->extract(\@header);
$self->transaction->header($header);
$self->{__buffer} = "";
}
if ($self->{__in_header}) {
$self->{__buffer} .= $_;
}
else {
$self->transaction->body_write($_);
}
$self->{__size} += length $_;
}
}
1;

View File

@ -44,10 +44,7 @@ and terminating the SMTP connection.
=cut
use IO::Select;
use warnings;
use strict;
my $MSG = 'Connecting host started transmitting before SMTP greeting';
sub register {
my ($self, $qp, @args) = @_;
@ -65,17 +62,18 @@ sub register {
if ($qp->{conn} && $qp->{conn}->isa('Apache2::Connection')) {
require APR::Const;
APR::Const->import(qw(POLLIN SUCCESS));
$self->register_hook('connect', 'apr_connect_handler');
$self->register_hook('connect', 'hook_connect_apr');
}
else {
$self->register_hook('connect', 'connect_handler');
$self->register_hook('connect', 'hook_connect');
}
$self->register_hook('mail', 'mail_handler')
$self->register_hook('connect', 'hook_connect_post');
$self->register_hook('mail', 'hook_mail')
if $self->{_args}->{'defer-reject'};
1;
}
sub apr_connect_handler {
sub hook_connect_apr {
my ($self, $transaction) = @_;
return DECLINED if ($self->qp->connection->notes('whitelistclient'));
@ -92,47 +90,55 @@ sub apr_connect_handler {
$self->qp->connection->notes('earlytalker', 1);
}
else {
my $msg = 'Connecting host started transmitting before SMTP greeting';
return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft';
return (DENY,$MSG) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$MSG) if $self->{_args}->{'action'} eq 'denysoft';
}
}
else {
$self->log(LOGINFO, "remote host said nothing spontaneous, proceeding");
}
}
sub connect_handler {
my ($self, $transaction) = @_;
my $in = new IO::Select;
my $ip = $self->qp->connection->remote_ip;
return DECLINED
if ($self->qp->connection->notes('whitelistclient'));
$in->add(\*STDIN) || return DECLINED;
if ($in->can_read($self->{_args}->{'wait'})) {
$self->log(LOGNOTICE, "remote host started talking before we said hello [$ip]");
if ($self->{_args}->{'defer-reject'}) {
$self->qp->connection->notes('earlytalker', 1);
} else {
my $msg = 'Connecting host started transmitting before SMTP greeting';
return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft';
}
} else {
$self->log(LOGINFO, 'remote host said nothing spontaneous, proceeding');
}
return DECLINED;
}
sub mail_handler {
my ($self, $txn) = @_;
my $msg = 'Connecting host started transmitting before SMTP greeting';
sub hook_connect {
my ($self, $transaction) = @_;
return DECLINED unless $self->qp->connection->notes('earlytalker');
return (DENY,$msg) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$msg) if $self->{_args}->{'action'} eq 'denysoft';
my $qp = $self->qp;
my $conn = $qp->connection;
$qp->AddTimer($self->{_args}{'wait'}, sub { read_now($qp, $conn) });
return CONTINUATION;
}
sub read_now {
my ($qp, $conn) = @_;
if (my $data = $qp->read(1024)) {
if (length($$data)) {
$qp->log(LOGNOTICE, 'remote host started talking before we said hello');
$qp->push_back_read($data);
$conn->notes('earlytalker', 1);
}
}
$qp->finish_continuation;
}
sub hook_connect_post {
my ($self, $transaction) = @_;
my $conn = $self->qp->connection;
return DECLINED unless $conn->notes('earlytalker');
return DECLINED if $self->{'defer-reject'};
return (DENY,$MSG) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$MSG) if $self->{_args}->{'action'} eq 'denysoft';
return DECLINED; # assume action eq 'log'
}
sub hook_mail {
my ($self, $txn) = @_;
return DECLINED unless $self->connection->notes('earlytalker');
return (DENY,$MSG) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT,$MSG) if $self->{_args}->{'action'} eq 'denysoft';
return DECLINED;
}

View File

@ -1,20 +1,17 @@
#!perl -w
#!/usr/bin/perl -w
use Danga::DNS;
sub register {
my ($self, $qp, $denial ) = @_;
if ( defined $denial and $denial =~ /^disconnect$/i ) {
$self->{_dnsbl}->{DENY} = DENY_DISCONNECT;
}
else {
$self->{_dnsbl}->{DENY} = DENY;
}
my ($self) = @_;
$self->register_hook("connect", "connect_handler");
$self->register_hook("connect", "pickup_handler");
}
sub hook_connect {
sub connect_handler {
my ($self, $transaction) = @_;
my $remote_ip = $self->qp->connection->remote_ip;
my $remote_ip = $self->connection->remote_ip;
# perform RBLSMTPD checks to mimic Dan Bernstein's rblsmtpd
if (defined($ENV{'RBLSMTPD'})) {
@ -29,163 +26,91 @@ sub hook_connect {
$self->log(LOGDEBUG, "RBLSMTPD not set for $remote_ip");
}
my $allow = grep { s/\.?$/./; $_ eq substr($remote_ip . '.', 0, length $_) } $self->qp->config('dnsbl_allow');
my $allow = grep { s/\.?$/./; $_ eq substr($remote_ip . '.', 0, length $_) } $self->config('dnsbl_allow');
return DECLINED if $allow;
my %dnsbl_zones = map { (split /:/, $_, 2)[0,1] } $self->qp->config('dnsbl_zones');
my %dnsbl_zones = map { (split /:/, $_, 2)[0,1] } $self->config('dnsbl_zones');
return DECLINED unless %dnsbl_zones;
my $reversed_ip = join(".", reverse(split(/\./, $remote_ip)));
# we should queue these lookups in the background and just fetch the
# results in the first rcpt handler ... oh well.
my $res = new Net::DNS::Resolver;
$res->tcp_timeout(30);
$res->udp_timeout(30);
my $sel = IO::Select->new();
$self->transaction->notes('pending_dns_queries', scalar(keys(%dnsbl_zones)));
my $qp = $self->qp;
for my $dnsbl (keys %dnsbl_zones) {
# fix to find A records, if the dnsbl_zones line has a second field 20/1/04 ++msp
if (defined($dnsbl_zones{$dnsbl})) {
$self->log(LOGDEBUG, "Checking $reversed_ip.$dnsbl for A record in the background");
$sel->add($res->bgsend("$reversed_ip.$dnsbl"));
Danga::DNS->new(
callback => sub { process_a_result($qp, $dnsbl_zones{$dnsbl}, @_) },
host => "$reversed_ip.$dnsbl",
type => 'A',
client => $self->qp->input_sock,
);
} else {
$self->log(LOGDEBUG, "Checking $reversed_ip.$dnsbl for TXT record in the background");
$sel->add($res->bgsend("$reversed_ip.$dnsbl", "TXT"));
Danga::DNS->new(
callback => sub { process_txt_result($qp, @_) },
host => "$reversed_ip.$dnsbl",
type => 'TXT',
client => $self->qp->input_sock,
);
}
}
$self->qp->connection->notes('dnsbl_sockets', $sel);
return DECLINED;
return CONTINUATION;
}
sub process_sockets {
my ($self) = @_;
sub process_a_result {
my ($qp, $template, $result, $query) = @_;
my $conn = $self->qp->connection;
my $pending = $qp->transaction->notes('pending_dns_queries');
$qp->transaction->notes('pending_dns_queries', --$pending);
return $conn->notes('dnsbl')
if $conn->notes('dnsbl');
my %dnsbl_zones = map { (split /:/, $_, 2)[0,1] } $self->qp->config('dnsbl_zones');
my $res = new Net::DNS::Resolver;
$res->tcp_timeout(30);
$res->udp_timeout(30);
my $sel = $conn->notes('dnsbl_sockets') or return "";
my $remote_ip = $self->qp->connection->remote_ip;
my $result;
$self->log(LOGDEBUG, "waiting for dnsbl dns");
# don't wait more than 8 seconds here
my @ready = $sel->can_read(8);
$self->log(LOGDEBUG, "DONE waiting for dnsbl dns, got " , scalar @ready, " answers ...") ;
return '' unless @ready;
for my $socket (@ready) {
my $query = $res->bgread($socket);
$sel->remove($socket);
undef $socket;
my $dnsbl;
if ($query) {
my $a_record = 0;
foreach my $rr ($query->answer) {
$a_record = 1 if $rr->type eq "A";
my $name = $rr->name;
($dnsbl) = ($name =~ m/(?:\d+\.){4}(.*)/) unless $dnsbl;
$dnsbl = $name unless $dnsbl;
$self->log(LOGDEBUG, "name ", $rr->name);
next unless $rr->type eq "TXT";
$self->log(LOGDEBUG, "got txt record");
$result = $rr->txtdata and last;
}
#$a_record and $result = "Blocked by $dnsbl";
if ($a_record) {
if (defined $dnsbl_zones{$dnsbl}) {
$result = $dnsbl_zones{$dnsbl};
#$result =~ s/%IP%/$ENV{'TCPREMOTEIP'}/g;
$result =~ s/%IP%/$remote_ip/g;
} else {
# shouldn't get here?
$result = "Blocked by $dnsbl";
}
}
}
else {
$self->log(LOGERROR, "$dnsbl query failed: ", $res->errorstring)
unless $res->errorstring eq "NXDOMAIN";
warn("Result for A $query: $result\n");
if ($result !~ /^\d+\.\d+\.\d+\.\d+$/) {
# NXDOMAIN or ERROR possibly...
$qp->finish_continuation unless $pending;
return;
}
if ($result) {
#kill any other pending I/O
$conn->notes('dnsbl_sockets', undef);
$result = join("\n", $self->qp->config('dnsbl_rejectmsg'), $result);
return $conn->notes('dnsbl', $result);
}
}
if ($sel->count) {
# loop around if we have dns blacklists left to see results from
return $self->process_sockets();
}
# er, the following code doesn't make much sense anymore...
# if there was more to read; then forget it
$conn->notes('dnsbl_sockets', undef);
return $conn->notes('dnsbl', $result);
my $conn = $qp->connection;
my $ip = $conn->remote_ip;
$template =~ s/%IP%/$ip/g;
$conn->notes('dnsbl', $template) unless $conn->notes('dnsbl');
$qp->finish_continuation unless $pending;
}
sub hook_rcpt {
sub process_txt_result {
my ($qp, $result, $query) = @_;
my $pending = $qp->transaction->notes('pending_dns_queries');
$qp->transaction->notes('pending_dns_queries', --$pending);
warn("Result for TXT $query: $result\n");
if ($result !~ /[a-z]/) {
# NXDOMAIN or ERROR probably...
$qp->finish_continuation unless $pending;
return;
}
my $conn = $qp->connection;
$conn->notes('dnsbl', $result) unless $conn->notes('dnsbl');
$qp->finish_continuation unless $pending;
}
sub pickup_handler {
my ($self, $transaction, $rcpt) = @_;
my $connection = $self->qp->connection;
# RBLSMTPD being non-empty means it contains the failure message to return
if (defined ($ENV{'RBLSMTPD'}) && $ENV{'RBLSMTPD'} ne '') {
my $result = $ENV{'RBLSMTPD'};
my $remote_ip = $connection->remote_ip;
my $remote_ip = $self->connection->remote_ip;
$result =~ s/%IP%/$remote_ip/g;
return ($self->{_dnsbl}->{DENY},
join(" ", $self->qp->config('dnsbl_rejectmsg'), $result));
return (DENY, join(" ", $self->config('dnsbl_rejectmsg'), $result));
}
my $note = $self->process_sockets;
my $whitelist = $connection->notes('whitelisthost');
if ( $note ) {
if ( $rcpt->user =~ /^(?:postmaster|abuse|mailer-daemon|root)$/i ) {
$self->log(LOGWARN, "Don't blacklist special account: ".$rcpt->user);
}
elsif ( $whitelist ) {
$self->log(LOGWARN, "Whitelist overrode blacklist: $whitelist");
}
elsif ( $connection->relay_client() ) {
$self->log(LOGWARN, "Don't blacklist relay/auth clients");
}
else {
return ($self->{_dnsbl}->{DENY}, $note);
}
}
return DECLINED;
}
sub hook_disconnect {
my ($self, $transaction) = @_;
$self->qp->connection->notes('dnsbl_sockets', undef);
my $note = $self->connection->notes('dnsbl');
return (DENY, $note) if $note;
return DECLINED;
}
@ -200,19 +125,6 @@ dnsbl - handle DNS BlackList lookups
Plugin that checks the IP address of the incoming connection against
a configurable set of RBL services.
=head1 Usage
Add the following line to the config/plugins file:
dnsbl [disconnect]
If you want to immediately drop the connection (since some blacklisted
servers attempt multiple sends per session), add the optional keyword
"disconnect" (case insensitive) to the config line. In most cases, an
IP address that is listed should not be given the opportunity to begin
a new transaction, since even the most volatile blacklists will return
the same answer for a short period of time (the minimum DNS cache period).
=head1 Configuration files
This plugin uses the following configuration files. All of these are optional.

View File

@ -39,12 +39,12 @@ sub hook_queue {
my ($self, $transaction) = @_;
# these bits inspired by Peter Samuels "qmail-queue wrapper"
pipe(MESSAGE_READER, MESSAGE_WRITER) or fault("Could not create message pipe"), exit;
pipe(ENVELOPE_READER, ENVELOPE_WRITER) or fault("Could not create envelope pipe"), exit;
pipe(MESSAGE_READER, MESSAGE_WRITER) or die("Could not create message pipe");
pipe(ENVELOPE_READER, ENVELOPE_WRITER) or die("Could not create envelope pipe");
my $child = fork();
not defined $child and fault(451, "Could not fork"), exit;
not defined $child and die("Could not fork");
if ($child) {
# Parent
@ -52,9 +52,13 @@ sub hook_queue {
select(ENVELOPE_WRITER); $| = 1;
select($oldfh);
close MESSAGE_READER or fault("close msg reader fault"),exit;
close ENVELOPE_READER or fault("close envelope reader fault"), exit;
close MESSAGE_READER or die("close msg reader fault");
close ENVELOPE_READER or die("close envelope reader fault");
# Note - technically there's a race here because if the exec() below
# fails and the writes to MESSAGE_WRITER block we get a deadlocked process.
# This check to see if(eof(PIPE)) will catch "most" of these problems.
die "Message pipe has been closed" if eof(MESSAGE_WRITER);
$transaction->header->print(\*MESSAGE_WRITER);
$transaction->body_resetpos;
while (my $line = $transaction->body_getline) {
@ -64,6 +68,7 @@ sub hook_queue {
my @rcpt = map { "T" . $_->address } $transaction->recipients;
my $from = "F".($transaction->sender->address|| "" );
die "Envelope pipe has been closed" if eof(ENVELOPE_WRITER);
print ENVELOPE_WRITER "$from\0", join("\0",@rcpt), "\0\0"
or return(DECLINED,"Could not print addresses to queue");
@ -104,6 +109,10 @@ sub hook_queue {
my $rc = exec $queue_exec;
# close the pipe
close(MESSAGE_READER);
close(MESSAGE_WRITER);
exit 6; # we'll only get here if the exec fails
}
}

View File

@ -1,21 +1,19 @@
use Net::DNS qw(mx);
#!/usr/bin/perl
sub hook_mail {
use Danga::DNS;
sub register {
my ($self) = @_;
$self->register_hook("mail", "mail_handler");
$self->register_hook("rcpt", "rcpt_handler");
}
sub mail_handler {
my ($self, $transaction, $sender) = @_;
return DECLINED
if ($self->qp->connection->notes('whitelistclient'));
$sender->format ne "<>"
and $self->qp->config("require_resolvable_fromhost")
and !$self->check_dns($sender->host)
and return (DENYSOFT,
($sender->host
? "Could not resolve ". $sender->host
: "FQDN required in the envelope sender"));
return DECLINED;
$self->transaction->notes('resolvable', 1);
return DECLINED if $sender->format eq "<>";
return $self->check_dns($sender->host);
}
@ -23,24 +21,61 @@ sub check_dns {
my ($self, $host) = @_;
# for stuff where we can't even parse a hostname out of the address
return 0 unless $host;
return DECLINED unless $host;
return 1 if $host =~ m/^\[(\d{1,3}\.){3}\d{1,3}\]$/;
if( $host =~ m/^\[(\d{1,3}\.){3}\d{1,3}\]$/ ) {
$self->transaction->notes('resolvable', 1);
return DECLINED;
}
my $res = new Net::DNS::Resolver;
$res->tcp_timeout(30);
$res->udp_timeout(30);
return 1 if mx($res, $host);
my $query = $res->search($host);
if ($query) {
foreach my $rr ($query->answer) {
return 1 if $rr->type eq "A" or $rr->type eq "MX";
}
}
else {
$self->log(LOGWARN, "$$ query for $host failed: ", $res->errorstring)
unless $res->errorstring eq "NXDOMAIN";
}
return 0;
$self->transaction->notes('pending_dns_queries', 2);
my $qp = $self->qp;
$self->log(LOGDEBUG, "Checking $host for MX record in the background");
Danga::DNS->new(
callback => sub { dns_result($qp, @_) },
host => $host,
type => "MX",
client => $qp->input_sock,
);
$self->log(LOGDEBUG, "Checking $host for A record in the background");
Danga::DNS->new(
callback => sub { dns_result($qp, @_) },
host => $host,
client => $qp->input_sock,
);
return CONTINUATION;
}
sub dns_result {
my ($qp, $result, $query) = @_;
my $pending = $qp->transaction->notes('pending_dns_queries');
$qp->transaction->notes('pending_dns_queries', --$pending);
if ($result =~ /^[A-Z]+$/) {
# probably an error
$qp->log(LOGDEBUG, "DNS error: $result looking up $query");
} else {
$qp->transaction->notes('resolvable', 1);
$qp->log(LOGDEBUG, "DNS lookup $query returned: $result");
}
$qp->finish_continuation unless $pending;
}
sub rcpt_handler {
my ($self, $transaction) = @_;
if (!$transaction->notes('resolvable')) {
my $sender = $transaction->sender;
$self->log(LOGDEBUG, "Could not resolve " .$sender->host) if $sender->host;
return (DENYSOFT,
($sender->host
? "Could not resolve ". $sender->host
: "FQDN required in the envelope sender"));
}
return DECLINED;
}

View File

@ -1,31 +1,39 @@
#!/usr/bin/perl
sub hook_mail {
use Danga::DNS;
sub register {
my ($self) = @_;
$self->register_hook('mail', 'mail_handler');
$self->register_hook('rcpt', 'rcpt_handler');
}
sub mail_handler {
my ($self, $transaction, $sender) = @_;
my $res = new Net::DNS::Resolver;
my $sel = IO::Select->new();
my %rhsbl_zones_map = ();
# Perform any RHS lookups in the background. We just send the query packets here
# and pick up any results in the RCPT handler.
# MTAs gets confused when you reject mail during MAIL FROM:
my %rhsbl_zones = map { (split /\s+/, $_, 2)[0,1] } $self->qp->config('rhsbl_zones');
my %rhsbl_zones = map { (split /\s+/, $_, 2)[0,1] } $self->config('rhsbl_zones');
if ($sender->format ne '<>' and %rhsbl_zones) {
my $helo = $self->connection->hello_host;
push(my @hosts, $sender->host);
#my $helo = $self->qp->connection->hello_host;
#push(@hosts, $helo) if $helo && $helo ne $sender->host;
push(@hosts, $helo) if $helo && $helo ne $sender->host;
for my $host (@hosts) {
for my $rhsbl (keys %rhsbl_zones) {
$self->log(LOGDEBUG, "Checking $host.$rhsbl for A record in the background");
$sel->add($res->bgsend("$host.$rhsbl"));
$rhsbl_zones_map{"$host.$rhsbl"} = $rhsbl_zones{$rhsbl};
Danga::DNS->new(
callback => sub { $self->process_result($host, $rhsbl_zones{$rhsbl}, @_) },
host => "$host.$rhsbl",
client => $self->qp->input_sock,
);
}
}
%{$self->{_rhsbl_zones_map}} = %rhsbl_zones_map;
$transaction->notes('rhsbl_sockets', $sel);
} else {
$self->log(LOGDEBUG, 'no RHS checks necessary');
}
@ -33,80 +41,28 @@ sub hook_mail {
return DECLINED;
}
sub hook_rcpt {
my ($self, $transaction, $rcpt) = @_;
my $host = $transaction->sender->host;
my $hello = $self->qp->connection->hello_host;
sub process_result {
my ($self, $host, $template, $result, $query) = @_;
my $result = $self->process_sockets;
if ($result && defined($self->{_rhsbl_zones_map}{$result})) {
if ($result =~ /^$host\./ ) {
return (DENY, "Mail from $host rejected because it " . $self->{_rhsbl_zones_map}{$result});
} else {
return (DENY, "Mail from HELO $hello rejected because it " . $self->{_rhsbl_zones_map}{$result});
if ($result !~ /^\d+\.\d+\.\d+\.\d+$/) {
# NXDOMAIN or error
return;
}
my $tran = $self->transaction;
return if $tran->notes('rhsbl');
if ($host eq $tran->sender->host) {
$tran->notes('rhsbl', "Mail from $host rejected because it $template");
}
else {
$tran->notes('rhsbl', "Mail from HELO $host rejected because it $template");
}
}
sub rcpt_handler {
my ($self, $transaction, $rcpt) = @_;
my $result = $transaction->notes('rhsbl');
return (DENY, $result) if $result;
return DECLINED;
}
sub process_sockets {
my ($self) = @_;
my $trans = $self->transaction;
my $result = '';
return $trans->notes('rhsbl') if $trans->notes('rhsbl');
my $res = new Net::DNS::Resolver;
my $sel = $trans->notes('rhsbl_sockets') or return '';
$self->log(LOGDEBUG, 'waiting for rhsbl dns');
# don't wait more than 8 seconds here
my @ready = $sel->can_read(8);
$self->log(LOGDEBUG, 'DONE waiting for rhsbl dns, got ' , scalar @ready, ' answers ...') ;
return '' unless @ready;
for my $socket (@ready) {
my $query = $res->bgread($socket);
$sel->remove($socket);
undef $socket;
if ($query) {
foreach my $rr ($query->answer) {
$self->log(LOGDEBUG, 'got an ' . $rr->type . ' record ' . $rr->name);
if ($rr->type eq 'A') {
$result = $rr->name;
$self->log(LOGDEBUG, "A record found for $result with IP " . $rr->address);
last;
}
}
} else {
$self->log(LOGCRIT, "query failed: ", $res->errorstring) unless $res->errorstring eq 'NXDOMAIN';
}
if ($result) {
#kill any other pending I/O
$trans->notes('rhsbl_sockets', undef);
return $trans->notes('rhsbl', $result);
}
}
if ($sel->count) {
# loop around if we have dns results left
return $self->process_sockets();
}
# if there was more to read; then forget it
$trans->notes('rhsbl_sockets', undef);
return $trans->notes('rhsbl', $result);
}
sub hook_disconnect {
my ($self, $transaction) = @_;
$transaction->notes('rhsbl_sockets', undef);
return DECLINED;
}

74
plugins/stats Normal file
View File

@ -0,0 +1,74 @@
#!/usr/bin/perl -w
use Time::HiRes qw(time);
my $START_TIME = time;
our $MAILS_RECEIVED = 0;
our $MAILS_REJECTED = 0;
our $MAILS_TEMPFAIL = 0;
sub register {
my ($self) = @_;
$self->register_hook('deny', 'increment_deny');
$self->register_hook('queue', 'increment_mails');
}
sub get_stats {
my $class = shift;
my $uptime = $class->uptime;
my $recvd = $class->mails_received;
my $reject = $class->mails_rejected;
my $soft = $class->mails_tempfailed;
my $rate = $class->mails_per_sec;
return sprintf(" Uptime: %0.2f sec\n".
" Mails Received: % 10d\n".
" 5xx: % 10d\n".
" 4xx: % 10d\n".
"Mails per second: %0.2f\n",
$uptime, $recvd, $reject, $soft, $rate);
}
sub increment_deny {
my ($self, $tran, $plugin, $level) = @_;
if ($level == DENY or $level == DENY_DISCONNECT) {
$MAILS_REJECTED++;
}
elsif ($level == DENYSOFT or $level == DENYSOFT_DISCONNECT) {
$MAILS_TEMPFAIL++;
}
return DECLINED;
}
sub increment_mails {
my $self = shift;
$MAILS_RECEIVED++;
return DECLINED;
}
sub uptime {
return (time() - $START_TIME);
}
sub mails_received {
return $MAILS_RECEIVED;
}
sub mails_rejected {
return $MAILS_REJECTED;
}
sub mails_tempfailed {
return $MAILS_TEMPFAIL;
}
sub mails_per_sec {
my $class = shift;
return ($MAILS_RECEIVED / $class->uptime());
}

View File

@ -21,7 +21,7 @@ MAIL FROM onwards.
=cut
use IO::Socket::SSL qw(debug1 debug2 debug3 debug4);
use IO::Socket::SSL; # qw(debug1 debug2 debug3 debug4);
sub init {
my ($self, $qp, $cert, $key) = @_;
@ -38,7 +38,6 @@ sub init {
SSL_cipher_list => 'HIGH',
SSL_server => 1
) or die "Could not create SSL context: $!";
# now extract the password...
$self->ssl_context($ssl_ctx);
}
@ -66,7 +65,20 @@ sub hook_unrecognized_command {
$self->qp->respond (220, "Go ahead with TLS");
eval {
my $tlssocket = IO::Socket::SSL->new_from_fd(
my $tlssocket;
if ($self->qp->isa('Danga::Socket')) {
# high_perf
$tlssocket = IO::Socket::SSL->start_SSL($self->qp->sock,
SSL_use_cert => 1,
SSL_cert_file => $self->tls_cert,
SSL_key_file => $self->tls_key,
SSL_cipher_list => 'HIGH',
SSL_server => 1,
SSL_reuse_ctx => $self->ssl_context,
) or die "Could not convert SSL socket: $!";
}
else {
$tlssocket = IO::Socket::SSL->new_from_fd(
fileno(STDIN), '+>',
SSL_use_cert => 1,
SSL_cert_file => $self->tls_cert,
@ -75,22 +87,22 @@ sub hook_unrecognized_command {
SSL_server => 1,
SSL_reuse_ctx => $self->ssl_context,
) or die "Could not create SSL socket: $!";
}
my $conn = $self->connection;
# Create a new connection object with subset of information collected thus far
$self->qp->connection(Qpsmtpd::Connection->new(
map { $_ => $conn->$_ }
qw(
local_ip
local_port
remote_ip
remote_port
remote_host
remote_info
),
));
my $newconn = Qpsmtpd::Connection->new();
for (qw(local_ip local_port remote_ip remote_port remote_host remote_info)) {
$newconn->$_($conn->$_());
}
$self->qp->connection($newconn);
$self->qp->reset_transaction;
if ($self->qp->isa('Danga::Socket')) {
$self->connection->notes('tls_socket', $tlssocket);
}
else {
*STDIN = *STDOUT = $self->connection->notes('tls_socket', $tlssocket);
}
$self->connection->notes('tls_enabled', 1);
};
if ($@) {
@ -131,5 +143,6 @@ sub ssl_context {
sub bad_ssl_hook {
my ($self, $transaction) = @_;
return DENY, "Command refused due to lack of security" if $transaction->notes('ssl_failed');
return DECLINED;
}
*hook_helo = *hook_data = *hook_rcpt = *hook_mail = *hook_auth = \&bad_ssl_hook;

467
qpsmtpd
View File

@ -1,30 +1,455 @@
#!/usr/bin/perl -Tw
# Copyright (c) 2001 Ask Bjoern Hansen. See the LICENSE file for details.
# The "command dispatch" system is taken from colobus - http://trainedmonkey.com/colobus/
#
# this is designed to be run under tcpserver (http://cr.yp.to/ucspi-tcp.html)
# or inetd if you're into that sort of thing
#
#
# For more information see http://develooper.com/code/qpsmtpd/
#
#
#!/usr/bin/perl
use lib "./lib";
BEGIN {
delete $ENV{ENV};
delete $ENV{BASH_ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}
use lib 'lib';
use Qpsmtpd::TcpServer;
use strict;
$| = 1;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Danga::Socket;
use Danga::Client;
use Qpsmtpd::PollServer;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use IO::Socket;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
delete $ENV{ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin';
$|++;
my $qpsmtpd = Qpsmtpd::TcpServer->new();
$qpsmtpd->start_connection();
$qpsmtpd->run();
# For debugging
# $SIG{USR1} = sub { Carp::confess("USR1") };
__END__
use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
$SIG{'PIPE'} = "IGNORE"; # handled manually
$DEBUG = 0;
my $CONFIG_PORT = 20025;
my $CONFIG_LOCALADDR = '127.0.0.1';
1;
my $PORT = 2525;
my $LOCALADDR = '0.0.0.0';
my $LineMode = 0;
my $PROCS = 1;
my $MAXCONN = 15; # max simultaneous connections
my $USER = 'smtpd'; # user to suid to
my $MAXCONNIP = 5; # max simultaneous connections from one IP
my $PAUSED = 0;
my $NUMACCEPT = 20;
sub help {
print <<EOT;
Usage:
qpsmtpd [OPTIONS]
Options:
-l, --listen-address addr : listen on a specific address; default 0.0.0.0
-p, --port P : listen on a specific port; default 2525
-c, --limit-connections N : limit concurrent connections to N; default 15
-u, --user U : run as a particular user; defualt 'smtpd'
-m, --max-from-ip M : limit connections from a single IP; default 5
-f, --forkmode : fork a child for each connection
-j, --procs J : spawn J processes; default 1
-a, --accept K : accept up to K conns per loop; default 20
-h, --help : this page
NB: -f and -j are mutually exclusive. If -f flag is not used the server uses
poll() style loops running inside J child processes. Set J to the number of
CPUs you have at your disposal.
EOT
exit(0);
}
GetOptions(
'p|port=i' => \$PORT,
'l|listen-address=s' => \$LOCALADDR,
'j|procs=i' => \$PROCS,
'd|debug+' => \$DEBUG,
'f|forkmode' => \$LineMode,
'c|limit-connections=i' => \$MAXCONN,
'm|max-from-ip=i' => \$MAXCONNIP,
'u|user=s' => \$USER,
'a|accept=i' => \$NUMACCEPT,
'h|help' => \&help,
'use-poll' => \&force_poll,
) || help();
# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help }
if ($MAXCONN =~ /^(\d+)$/) { $MAXCONN = $1 } else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }
if ($NUMACCEPT =~ /^(\d+)$/) { $NUMACCEPT = $1 } else { &help }
my $_NUMACCEPT = $NUMACCEPT;
$::LineMode = $LineMode;
$PROCS = 1 if $LineMode;
# This is a bit of a hack, but we get to approximate MAXCONN stuff when we
# have multiple children listening on the same socket.
$MAXCONN /= $PROCS;
$MAXCONNIP /= $PROCS;
sub force_poll {
$Danga::Socket::HaveEpoll = 0;
$Danga::Socket::HaveKQueue = 0;
}
Danga::Socket::init_poller();
my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" :
$Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
my $SERVER;
my $CONFIG_SERVER;
# Code for inetd/tcpserver mode
if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) {
run_as_inetd();
exit(0);
}
my %childstatus = ();
run_as_server();
exit(0);
sub _fork {
my $pid = fork;
if (!defined($pid)) { die "Cannot fork: $!" }
return $pid if $pid;
# Fixup Net::DNS randomness after fork
srand($$ ^ time);
local $^W;
delete $INC{'Net/DNS/Header.pm'};
require Net::DNS::Header;
# cope with different versions of Net::DNS
eval {
$Net::DNS::Resolver::global{id} = 1;
$Net::DNS::Resolver::global{id} = int(rand(Net::DNS::Resolver::MAX_ID()));
# print "Next DNS ID: $Net::DNS::Resolver::global{id}\n";
};
if ($@) {
# print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n";
}
# Fixup lost kqueue after fork
$Danga::Socket::HaveKQueue = undef;
Danga::Socket::init_poller();
}
sub spawn_child {
_fork and return;
$SIG{CHLD} = "DEFAULT";
Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler);
Qpsmtpd::PollServer->EventLoop();
exit;
}
sub sig_chld {
$SIG{CHLD} = 'IGNORE';
while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
last unless $child > 0;
print "child $child died\n";
delete $childstatus{$child};
}
return if $LineMode;
# restart a new child if in poll server mode
spawn_child();
$SIG{CHLD} = \&sig_chld;
}
sub HUNTSMAN {
$SIG{CHLD} = 'DEFAULT';
kill 'INT' => keys %childstatus;
exit(0);
}
sub run_as_inetd {
$LineMode = $::LineMode = 1;
my $insock = IO::Handle->new_from_fd(0, "r");
IO::Handle::blocking($insock, 0);
my $outsock = IO::Handle->new_from_fd(1, "w");
IO::Handle::blocking($outsock, 0);
my $client = Danga::Client->new($insock);
my $out = Qpsmtpd::PollServer->new($outsock);
$out->load_plugins;
$out->init_logger;
$out->input_sock($client);
$client->push_back_read("Connect\n");
# Cause poll/kevent/epoll to end quickly in first iteration
Qpsmtpd::PollServer->AddTimer(1, sub { });
while (1) {
$client->enable_read;
my $line = $client->get_line;
last if !defined($line);
my $output = $out->process_line($line);
$out->write($output) if $output;
}
}
sub run_as_server {
local $::MAXconn = $MAXCONN;
# establish SERVER socket, bind and listen.
$SERVER = IO::Socket::INET->new(LocalPort => $PORT,
LocalAddr => $LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => SOMAXCONN )
or die "Error creating server $LOCALADDR:$PORT : $@\n";
IO::Handle::blocking($SERVER, 0);
binmode($SERVER, ':raw');
$CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT,
LocalAddr => $CONFIG_LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => 1 )
or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n";
IO::Handle::blocking($CONFIG_SERVER, 0);
binmode($CONFIG_SERVER, ':raw');
# Drop priviledges
my (undef, undef, $quid, $qgid) = getpwnam $USER or
die "unable to determine uid/gid for $USER\n";
$) = "";
POSIX::setgid($qgid) or
die "unable to change gid: $!\n";
POSIX::setuid($quid) or
die "unable to change uid: $!\n";
$> = $quid;
::log(LOGINFO, 'Running as user '.
(getpwuid($>) || $>) .
', group '.
(getgrgid($)) || $)));
# Load plugins here
my $plugin_loader = Qpsmtpd::SMTP->new();
$plugin_loader->load_plugins;
if ($PROCS > 1) {
$SIG{'CHLD'} = \&sig_chld;
my @kids;
for (1..$PROCS) {
push @kids, spawn_child();
}
$SIG{INT} = $SIG{TERM} = sub { $SIG{CHLD} = "IGNORE"; kill 2 => @kids; exit };
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children $POLL");
sleep while (1);
}
else {
if ($LineMode) {
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
}
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process $POLL" .
($LineMode ? " (forking server)" : ""));
Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
fileno($CONFIG_SERVER) => \&config_handler,
);
while (1) {
Qpsmtpd::PollServer->EventLoop();
}
exit;
}
}
sub config_handler {
my $csock = $CONFIG_SERVER->accept();
if (!$csock) {
# warn("accept failed on config server: $!");
return;
}
binmode($csock, ':raw');
printf("Config server connection\n") if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
my $client = Qpsmtpd::ConfigServer->new($csock);
$client->watch_read(1);
return;
}
# Accept all new connections
sub accept_handler {
my $running;
if( $LineMode ) {
$running = scalar keys %childstatus;
}
else {
my $descriptors = Danga::Client->DescriptorMap;
$running = scalar keys %$descriptors;
}
for (1 .. $NUMACCEPT) {
if ($running >= $MAXCONN) {
::log(LOGINFO,"Too many connections: $running >= $MAXCONN.");
return;
}
$running++;
if (! _accept_handler($running)) {
# got here because we have too many accepts.
$NUMACCEPT = $_NUMACCEPT;
return;
}
}
# got here because we have accept's left.
# So double the number we accept next time.
$NUMACCEPT *= 2;
}
use Errno qw(EAGAIN EWOULDBLOCK);
sub _accept_handler {
my $running = shift;
my $csock = $SERVER->accept();
if (!$csock) {
# warn("accept() failed: $!");
return;
if ($! == EAGAIN || $! == EWOULDBLOCK) {
return;
}
else {
warn("accept() failed: $!");
return 1;
}
}
binmode($csock, ':raw');
printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock))
if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
if (!$LineMode) {
# multiplex mode
my $client = Qpsmtpd::PollServer->new($csock);
my $rem_ip = $client->peer_ip_string;
if ($PAUSED) {
$client->write("451 Sorry, this server is currently paused\r\n");
$client->close;
return 1;
}
if ($MAXCONNIP) {
my $num_conn = 1; # seed with current value
# If we for-loop directly over values %childstatus, a SIGCHLD
# can call REAPER and slip $rip out from under us. Causes
# "Use of freed value in iteration" under perl 5.8.4.
my $descriptors = Danga::Client->DescriptorMap;
my @obj = values %$descriptors;
foreach my $obj (@obj) {
local $^W;
# This is a bit of a slow way to do this. Wish I could cache the method call.
++$num_conn if ($obj->peer_ip_string eq $rem_ip);
}
if ($num_conn > $MAXCONNIP) {
$client->log(LOGINFO,"Too many connections from $rem_ip: "
."$num_conn > $MAXCONNIP. Denying connection.");
$client->write("451 Sorry, too many connections from $rem_ip, try again later\r\n");
$client->close;
return 1;
}
$client->log(LOGINFO, "accepted connection $running/$MAXCONN ($num_conn/$MAXCONNIP) from $rem_ip");
}
$client->push_back_read("Connect\n");
$client->watch_read(1);
return 1;
}
# fork-per-connection mode
my $rem_ip = $csock->sockhost();
if ($MAXCONNIP) {
my $num_conn = 1; # seed with current value
my @rip = values %childstatus;
foreach my $rip (@rip) {
++$num_conn if (defined $rip && $rip eq $rem_ip);
}
if ($num_conn > $MAXCONNIP) {
::log(LOGINFO,"Too many connections from $rem_ip: "
."$num_conn > $MAXCONNIP. Denying connection.");
print $csock "451 Sorry, too many connections from $rem_ip, try again later\r\n";
close $csock;
return 1;
}
}
if (my $pid = _fork) {
$childstatus{$pid} = $rem_ip;
return $csock->close();
}
$SERVER->close(); # make sure the child doesn't accept() new connections
$SIG{$_} = 'DEFAULT' for keys %SIG;
my $client = Qpsmtpd::PollServer->new($csock);
$client->push_back_read("Connect\n");
# Cause poll/kevent/epoll to end quickly in first iteration
Qpsmtpd::PollServer->AddTimer(0.1, sub { });
while (1) {
$client->enable_read;
my $line = $client->get_line;
last if !defined($line);
my $resp = $client->process_line($line);
$client->write($resp) if $resp;
}
$client->log(LOGDEBUG, "Finished with child %d.\n", fileno($csock))
if $DEBUG;
$client->close();
exit;
}
########################################################################
sub log {
my ($level,$message) = @_;
# $level not used yet. this is reimplemented from elsewhere anyway
warn("$$ fd:? $message\n");
}
sub pause {
my ($pause) = @_;
$PAUSED = $pause;
}

View File

@ -11,6 +11,7 @@ use Qpsmtpd::TcpServer;
use Qpsmtpd::Constants;
use IO::Socket;
use IO::Select;
use Qpsmtpd::PollServer;
use Socket;
use Getopt::Long;
use POSIX qw(:sys_wait_h :errno_h :signal_h);
@ -24,6 +25,7 @@ my @LOCALADDR; # ip address(es) to bind to
my $USER = 'smtpd'; # user to suid to
my $MAXCONNIP = 5; # max simultaneous connections from one IP
my $PID_FILE = ''; # file to which server PID will be written
our $DEBUG = 0;
sub usage {
print <<"EOT";
@ -47,6 +49,7 @@ GetOptions('h|help' => \&usage,
'p|port=i' => \$PORT,
'u|user=s' => \$USER,
'pid-file=s' => \$PID_FILE,
'd|debug+' => \$DEBUG,
) || &usage;
# detaint the commandline
@ -68,6 +71,12 @@ $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin';
my %childstatus = ();
sub REAPER {
# foreach my $chld (keys %childstatus) {
# if (defined(waitpid($chld, WNOHANG))) {
# ::log(LOGINFO,"cleaning up after $chld");
# delete $childstatus{$chld};
# }
# }
while ( defined(my $chld = waitpid(-1, WNOHANG)) ){
last unless $chld > 0;
::log(LOGINFO,"cleaning up after $chld");
@ -212,30 +221,22 @@ while (1) {
::log(LOGINFO, "Connection Timed Out");
exit; };
my $localsockaddr = getsockname($client);
my ($lport, $laddr) = sockaddr_in($localsockaddr);
$ENV{TCPLOCALIP} = inet_ntoa($laddr);
# my ($port, $iaddr) = sockaddr_in($hisaddr);
$ENV{TCPREMOTEIP} = inet_ntoa($iaddr);
$ENV{TCPREMOTEHOST} = gethostbyaddr($iaddr, AF_INET) || "Unknown";
::log(LOGINFO, "Accepted connection $running/$MAXCONN");
# don't do this!
#$0 = "qpsmtpd-forkserver: $ENV{TCPREMOTEIP} / $ENV{TCPREMOTEHOST}";
$::LineMode = 1;
::log(LOGINFO, "Accepted connection $running/$MAXCONN from $ENV{TCPREMOTEIP} / $ENV{TCPREMOTEHOST}");
# dup to STDIN/STDOUT
POSIX::dup2(fileno($client), 0);
POSIX::dup2(fileno($client), 1);
$qpsmtpd->start_connection
(
local_ip => $ENV{TCPLOCALIP},
local_port => $lport,
remote_ip => $ENV{TCPREMOTEIP},
remote_port => $port,
);
$qpsmtpd->run();
my $qp = Qpsmtpd::PollServer->new($client);
$qp->load_plugins;
$qp->init_logger;
$qp->push_back_read("Connect\n");
Qpsmtpd::PollServer->AddTimer(0.1, sub { });
while (1) {
$qp->enable_read;
my $line = $qp->get_line;
last if !defined($line);
my $output = $qp->process_line($line);
$qp->write($output) if $output;
}
exit; # child leaves
}

View File

@ -1,28 +0,0 @@
#!/usr/bin/perl -Tw
# Copyright (c) 2001 Ask Bjoern Hansen. See the LICENSE file for details.
# The "command dispatch" system is taken from colobus - http://trainedmonkey.com/colobus/
#
# this is designed to be run under tcpserver (http://cr.yp.to/ucspi-tcp.html)
# or inetd if you're into that sort of thing
#
#
# For more information see http://develooper.com/code/qpsmtpd/
#
#
use lib 'lib';
use Qpsmtpd::SelectServer;
use strict;
$| = 1;
delete $ENV{ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin';
Qpsmtpd::SelectServer->main();
__END__
1;