qpsmtpd/qpsmtpd-async
2006-12-08 19:46:18 +00:00

312 lines
8.3 KiB
Perl
Executable File

#!/usr/bin/perl
use lib "./lib";
BEGIN {
delete $ENV{ENV};
delete $ENV{BASH_ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}
use strict;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Danga::Socket;
use Danga::Client;
use Qpsmtpd::PollServer;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use IO::Socket;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
$|++;
use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
$SIG{'PIPE'} = "IGNORE"; # handled manually
$DEBUG = 0;
my $CONFIG_PORT = 20025;
my $CONFIG_LOCALADDR = '127.0.0.1';
my $PORT = 2525;
my $LOCALADDR = '0.0.0.0';
my $PROCS = 1;
my $USER = 'smtpd'; # user to suid to
my $PAUSED = 0;
my $NUMACCEPT = 20;
my $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
# make sure we don't spend forever doing accept()
use constant ACCEPT_MAX => 1000;
sub reset_num_accept {
$NUMACCEPT = 20;
}
sub help {
print <<EOT;
Usage:
qpsmtpd [OPTIONS]
Options:
-l, --listen-address addr : listen on a specific address; default 0.0.0.0
-p, --port P : listen on a specific port; default 2525
-u, --user U : run as a particular user; defualt 'smtpd'
-j, --procs J : spawn J processes; default 1
-h, --help : this page
--use-poll : force use of poll() instead of epoll()/kqueue()
EOT
exit(0);
}
GetOptions(
'p|port=i' => \$PORT,
'l|listen-address=s' => \$LOCALADDR,
'j|procs=i' => \$PROCS,
'd|debug+' => \$DEBUG,
'u|user=s' => \$USER,
'h|help' => \&help,
) || help();
# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }
sub force_poll {
$Danga::Socket::HaveEpoll = 0;
$Danga::Socket::HaveKQueue = 0;
}
my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" :
$Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
my $SERVER;
my $CONFIG_SERVER;
my %childstatus = ();
run_as_server();
exit(0);
sub _fork {
my $pid = fork;
if (!defined($pid)) { die "Cannot fork: $!" }
return $pid if $pid;
# Fixup Net::DNS randomness after fork
srand($$ ^ time);
local $^W;
delete $INC{'Net/DNS/Header.pm'};
require Net::DNS::Header;
# cope with different versions of Net::DNS
eval {
$Net::DNS::Resolver::global{id} = 1;
$Net::DNS::Resolver::global{id} = int(rand(Net::DNS::Resolver::MAX_ID()));
# print "Next DNS ID: $Net::DNS::Resolver::global{id}\n";
};
if ($@) {
# print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n";
}
# Fixup lost kqueue after fork
$Danga::Socket::HaveKQueue = undef;
}
sub spawn_child {
my $plugin_loader = shift || Qpsmtpd::SMTP->new;
if (my $pid = _fork) {
return $pid;
}
$SIG{HUP} = $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
$SIG{PIPE} = 'IGNORE';
Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler);
$plugin_loader->run_hooks('post-fork');
Qpsmtpd::PollServer->EventLoop();
exit;
}
sub sig_chld {
my $spawn_count = 0;
while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
if (!defined $childstatus{$child}) {
next;
}
last unless $child > 0;
print "SIGCHLD: child $child died\n";
delete $childstatus{$child};
$spawn_count++;
}
if ($spawn_count) {
for (1..$spawn_count) {
# restart a new child if in poll server mode
my $pid = spawn_child();
$childstatus{$pid} = 1;
}
}
$SIG{CHLD} = \&sig_chld;
}
sub HUNTSMAN {
$SIG{CHLD} = 'DEFAULT';
kill 'INT' => keys %childstatus;
exit(0);
}
sub run_as_server {
# establish SERVER socket, bind and listen.
$SERVER = IO::Socket::INET->new(LocalPort => $PORT,
LocalAddr => $LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => SOMAXCONN )
or die "Error creating server $LOCALADDR:$PORT : $@\n";
IO::Handle::blocking($SERVER, 0);
binmode($SERVER, ':raw');
$CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT,
LocalAddr => $CONFIG_LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => 1 )
or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n";
IO::Handle::blocking($CONFIG_SERVER, 0);
binmode($CONFIG_SERVER, ':raw');
# Drop priviledges
my (undef, undef, $quid, $qgid) = getpwnam $USER or
die "unable to determine uid/gid for $USER\n";
$) = "";
POSIX::setgid($qgid) or
die "unable to change gid: $!\n";
POSIX::setuid($quid) or
die "unable to change uid: $!\n";
$> = $quid;
# Load plugins here
my $plugin_loader = Qpsmtpd::SMTP->new();
$plugin_loader->load_plugins;
$plugin_loader->log(LOGINFO, 'Running as user '.
(getpwuid($>) || $>) .
', group '.
(getgrgid($)) || $)));
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
if ($PROCS > 1) {
for (1..$PROCS) {
my $pid = spawn_child($plugin_loader);
$childstatus{$pid} = 1;
}
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children $POLL");
$SIG{'CHLD'} = \&sig_chld;
sleep while (1);
}
else {
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process $POLL");
Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
fileno($CONFIG_SERVER) => \&config_handler,
);
$plugin_loader->run_hooks('post-fork');
while (1) {
Qpsmtpd::PollServer->EventLoop();
}
exit;
}
}
sub config_handler {
my $csock = $CONFIG_SERVER->accept();
if (!$csock) {
# warn("accept failed on config server: $!");
return;
}
binmode($csock, ':raw');
printf("Config server connection\n") if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
my $client = Qpsmtpd::ConfigServer->new($csock);
$client->watch_read(1);
return;
}
# Accept all new connections
sub accept_handler {
for (1 .. $NUMACCEPT) {
return unless _accept_handler();
}
# got here because we have accept's left.
# So double the number we accept next time.
$NUMACCEPT *= 2;
$NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX;
$ACCEPT_RSET->cancel;
$ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
}
use Errno qw(EAGAIN EWOULDBLOCK);
sub _accept_handler {
my $csock = $SERVER->accept();
if (!$csock) {
# warn("accept() failed: $!");
return;
}
binmode($csock, ':raw');
printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock))
if $DEBUG;
IO::Handle::blocking($csock, 0);
#setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
my $client = Qpsmtpd::PollServer->new($csock);
if ($PAUSED) {
$client->write("451 Sorry, this server is currently paused\r\n");
$client->close;
return 1;
}
$client->push_back_read("Connect\n");
$client->watch_read(1);
return 1;
}
########################################################################
sub log {
my ($level,$message) = @_;
# $level not used yet. this is reimplemented from elsewhere anyway
warn("$$ fd:? $message\n");
}
sub pause {
my ($pause) = @_;
$PAUSED = $pause;
}