qpsmtpd/qpsmtpd
Matt Sergeant 9683016276 MERGE r386:r480 FROM https://svn.perl.org/qpsmtpd/branches/high_perf
High perf branch merge and fixes


git-svn-id: https://svn.perl.org/qpsmtpd/trunk@496 958fd67b-6ff1-0310-b445-bb7760255be9
2005-07-11 19:10:49 +00:00

456 lines
13 KiB
Perl
Executable File

#!/usr/bin/perl
use lib "./lib";
BEGIN {
delete $ENV{ENV};
delete $ENV{BASH_ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}
use strict;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Danga::Socket;
use Danga::Client;
use Qpsmtpd::PollServer;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use IO::Socket;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
$|++;
# For debugging
# $SIG{USR1} = sub { Carp::confess("USR1") };
use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
$SIG{'PIPE'} = "IGNORE"; # handled manually
$DEBUG = 0;
my $CONFIG_PORT = 20025;
my $CONFIG_LOCALADDR = '127.0.0.1';
my $PORT = 2525;
my $LOCALADDR = '0.0.0.0';
my $LineMode = 0;
my $PROCS = 1;
my $MAXCONN = 15; # max simultaneous connections
my $USER = 'smtpd'; # user to suid to
my $MAXCONNIP = 5; # max simultaneous connections from one IP
my $PAUSED = 0;
my $NUMACCEPT = 20;
sub help {
print <<EOT;
Usage:
qpsmtpd [OPTIONS]
Options:
-l, --listen-address addr : listen on a specific address; default 0.0.0.0
-p, --port P : listen on a specific port; default 2525
-c, --limit-connections N : limit concurrent connections to N; default 15
-u, --user U : run as a particular user; defualt 'smtpd'
-m, --max-from-ip M : limit connections from a single IP; default 5
-f, --forkmode : fork a child for each connection
-j, --procs J : spawn J processes; default 1
-a, --accept K : accept up to K conns per loop; default 20
-h, --help : this page
NB: -f and -j are mutually exclusive. If -f flag is not used the server uses
poll() style loops running inside J child processes. Set J to the number of
CPUs you have at your disposal.
EOT
exit(0);
}
GetOptions(
'p|port=i' => \$PORT,
'l|listen-address=s' => \$LOCALADDR,
'j|procs=i' => \$PROCS,
'd|debug+' => \$DEBUG,
'f|forkmode' => \$LineMode,
'c|limit-connections=i' => \$MAXCONN,
'm|max-from-ip=i' => \$MAXCONNIP,
'u|user=s' => \$USER,
'a|accept=i' => \$NUMACCEPT,
'h|help' => \&help,
'use-poll' => \&force_poll,
) || help();
# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help }
if ($MAXCONN =~ /^(\d+)$/) { $MAXCONN = $1 } else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }
if ($NUMACCEPT =~ /^(\d+)$/) { $NUMACCEPT = $1 } else { &help }
my $_NUMACCEPT = $NUMACCEPT;
$::LineMode = $LineMode;
$PROCS = 1 if $LineMode;
# This is a bit of a hack, but we get to approximate MAXCONN stuff when we
# have multiple children listening on the same socket.
$MAXCONN /= $PROCS;
$MAXCONNIP /= $PROCS;
sub force_poll {
$Danga::Socket::HaveEpoll = 0;
$Danga::Socket::HaveKQueue = 0;
}
Danga::Socket::init_poller();
my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" :
$Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
my $SERVER;
my $CONFIG_SERVER;
# Code for inetd/tcpserver mode
if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) {
run_as_inetd();
exit(0);
}
my %childstatus = ();
run_as_server();
exit(0);
sub _fork {
my $pid = fork;
if (!defined($pid)) { die "Cannot fork: $!" }
return $pid if $pid;
# Fixup Net::DNS randomness after fork
srand($$ ^ time);
local $^W;
delete $INC{'Net/DNS/Header.pm'};
require Net::DNS::Header;
# cope with different versions of Net::DNS
eval {
$Net::DNS::Resolver::global{id} = 1;
$Net::DNS::Resolver::global{id} = int(rand(Net::DNS::Resolver::MAX_ID()));
# print "Next DNS ID: $Net::DNS::Resolver::global{id}\n";
};
if ($@) {
# print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n";
}
# Fixup lost kqueue after fork
$Danga::Socket::HaveKQueue = undef;
Danga::Socket::init_poller();
}
sub spawn_child {
_fork and return;
$SIG{CHLD} = "DEFAULT";
Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler);
Qpsmtpd::PollServer->EventLoop();
exit;
}
sub sig_chld {
$SIG{CHLD} = 'IGNORE';
while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
last unless $child > 0;
print "child $child died\n";
delete $childstatus{$child};
}
return if $LineMode;
# restart a new child if in poll server mode
spawn_child();
$SIG{CHLD} = \&sig_chld;
}
sub HUNTSMAN {
$SIG{CHLD} = 'DEFAULT';
kill 'INT' => keys %childstatus;
exit(0);
}
sub run_as_inetd {
$LineMode = $::LineMode = 1;
my $insock = IO::Handle->new_from_fd(0, "r");
IO::Handle::blocking($insock, 0);
my $outsock = IO::Handle->new_from_fd(1, "w");
IO::Handle::blocking($outsock, 0);
my $client = Danga::Client->new($insock);
my $out = Qpsmtpd::PollServer->new($outsock);
$out->load_plugins;
$out->init_logger;
$out->input_sock($client);
$client->push_back_read("Connect\n");
# Cause poll/kevent/epoll to end quickly in first iteration
Qpsmtpd::PollServer->AddTimer(1, sub { });
while (1) {
$client->enable_read;
my $line = $client->get_line;
last if !defined($line);
my $output = $out->process_line($line);
$out->write($output) if $output;
}
}
sub run_as_server {
local $::MAXconn = $MAXCONN;
# establish SERVER socket, bind and listen.
$SERVER = IO::Socket::INET->new(LocalPort => $PORT,
LocalAddr => $LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => SOMAXCONN )
or die "Error creating server $LOCALADDR:$PORT : $@\n";
IO::Handle::blocking($SERVER, 0);
binmode($SERVER, ':raw');
$CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT,
LocalAddr => $CONFIG_LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => 1 )
or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n";
IO::Handle::blocking($CONFIG_SERVER, 0);
binmode($CONFIG_SERVER, ':raw');
# Drop priviledges
my (undef, undef, $quid, $qgid) = getpwnam $USER or
die "unable to determine uid/gid for $USER\n";
$) = "";
POSIX::setgid($qgid) or
die "unable to change gid: $!\n";
POSIX::setuid($quid) or
die "unable to change uid: $!\n";
$> = $quid;
::log(LOGINFO, 'Running as user '.
(getpwuid($>) || $>) .
', group '.
(getgrgid($)) || $)));
# Load plugins here
my $plugin_loader = Qpsmtpd::SMTP->new();
$plugin_loader->load_plugins;
if ($PROCS > 1) {
$SIG{'CHLD'} = \&sig_chld;
my @kids;
for (1..$PROCS) {
push @kids, spawn_child();
}
$SIG{INT} = $SIG{TERM} = sub { $SIG{CHLD} = "IGNORE"; kill 2 => @kids; exit };
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children $POLL");
sleep while (1);
}
else {
if ($LineMode) {
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
}
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process $POLL" .
($LineMode ? " (forking server)" : ""));
Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
fileno($CONFIG_SERVER) => \&config_handler,
);
while (1) {
Qpsmtpd::PollServer->EventLoop();
}
exit;
}
}
sub config_handler {
my $csock = $CONFIG_SERVER->accept();
if (!$csock) {
# warn("accept failed on config server: $!");
return;
}
binmode($csock, ':raw');
printf("Config server connection\n") if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
my $client = Qpsmtpd::ConfigServer->new($csock);
$client->watch_read(1);
return;
}
# Accept all new connections
sub accept_handler {
my $running;
if( $LineMode ) {
$running = scalar keys %childstatus;
}
else {
my $descriptors = Danga::Client->DescriptorMap;
$running = scalar keys %$descriptors;
}
for (1 .. $NUMACCEPT) {
if ($running >= $MAXCONN) {
::log(LOGINFO,"Too many connections: $running >= $MAXCONN.");
return;
}
$running++;
if (! _accept_handler($running)) {
# got here because we have too many accepts.
$NUMACCEPT = $_NUMACCEPT;
return;
}
}
# got here because we have accept's left.
# So double the number we accept next time.
$NUMACCEPT *= 2;
}
use Errno qw(EAGAIN EWOULDBLOCK);
sub _accept_handler {
my $running = shift;
my $csock = $SERVER->accept();
if (!$csock) {
# warn("accept() failed: $!");
return;
if ($! == EAGAIN || $! == EWOULDBLOCK) {
return;
}
else {
warn("accept() failed: $!");
return 1;
}
}
binmode($csock, ':raw');
printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock))
if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
if (!$LineMode) {
# multiplex mode
my $client = Qpsmtpd::PollServer->new($csock);
my $rem_ip = $client->peer_ip_string;
if ($PAUSED) {
$client->write("451 Sorry, this server is currently paused\r\n");
$client->close;
return 1;
}
if ($MAXCONNIP) {
my $num_conn = 1; # seed with current value
# If we for-loop directly over values %childstatus, a SIGCHLD
# can call REAPER and slip $rip out from under us. Causes
# "Use of freed value in iteration" under perl 5.8.4.
my $descriptors = Danga::Client->DescriptorMap;
my @obj = values %$descriptors;
foreach my $obj (@obj) {
local $^W;
# This is a bit of a slow way to do this. Wish I could cache the method call.
++$num_conn if ($obj->peer_ip_string eq $rem_ip);
}
if ($num_conn > $MAXCONNIP) {
$client->log(LOGINFO,"Too many connections from $rem_ip: "
."$num_conn > $MAXCONNIP. Denying connection.");
$client->write("451 Sorry, too many connections from $rem_ip, try again later\r\n");
$client->close;
return 1;
}
$client->log(LOGINFO, "accepted connection $running/$MAXCONN ($num_conn/$MAXCONNIP) from $rem_ip");
}
$client->push_back_read("Connect\n");
$client->watch_read(1);
return 1;
}
# fork-per-connection mode
my $rem_ip = $csock->sockhost();
if ($MAXCONNIP) {
my $num_conn = 1; # seed with current value
my @rip = values %childstatus;
foreach my $rip (@rip) {
++$num_conn if (defined $rip && $rip eq $rem_ip);
}
if ($num_conn > $MAXCONNIP) {
::log(LOGINFO,"Too many connections from $rem_ip: "
."$num_conn > $MAXCONNIP. Denying connection.");
print $csock "451 Sorry, too many connections from $rem_ip, try again later\r\n";
close $csock;
return 1;
}
}
if (my $pid = _fork) {
$childstatus{$pid} = $rem_ip;
return $csock->close();
}
$SERVER->close(); # make sure the child doesn't accept() new connections
$SIG{$_} = 'DEFAULT' for keys %SIG;
my $client = Qpsmtpd::PollServer->new($csock);
$client->push_back_read("Connect\n");
# Cause poll/kevent/epoll to end quickly in first iteration
Qpsmtpd::PollServer->AddTimer(0.1, sub { });
while (1) {
$client->enable_read;
my $line = $client->get_line;
last if !defined($line);
my $resp = $client->process_line($line);
$client->write($resp) if $resp;
}
$client->log(LOGDEBUG, "Finished with child %d.\n", fileno($csock))
if $DEBUG;
$client->close();
exit;
}
########################################################################
sub log {
my ($level,$message) = @_;
# $level not used yet. this is reimplemented from elsewhere anyway
warn("$$ fd:? $message\n");
}
sub pause {
my ($pause) = @_;
$PAUSED = $pause;
}