qpsmtpd/qpsmtpd-async

465 lines
12 KiB
Perl
Executable File

#!/usr/bin/perl
use lib "./lib";
BEGIN {
delete $ENV{ENV};
delete $ENV{BASH_ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}
# Profiling - requires Devel::Profiler 0.05
#BEGIN { $Devel::Profiler::NO_INIT = 1; }
#use Devel::Profiler;
use strict;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Danga::Socket;
use Danga::Client;
use Qpsmtpd::PollServer;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use IO::Socket;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
use List::Util qw(shuffle);
$|++;
use Socket
qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET AF_UNIX SOCK_STREAM PF_UNSPEC);
$SIG{'PIPE'} = "IGNORE"; # handled manually
$DEBUG = 0;
my $CONFIG_PORT = 20025;
my $CONFIG_LOCALADDR = '127.0.0.1';
my $PORT = 2525;
my $LOCALADDR = '0.0.0.0';
my $PROCS = 1;
my $USER = (getpwuid $>)[0]; # user to suid to
$USER = "smtpd" if $USER eq "root";
my $PAUSED = 0;
my $NUMACCEPT = 20;
my $PID_FILE = '';
my $ACCEPT_RSET;
my $DETACH; # daemonize on startup
# make sure we don't spend forever doing accept()
use constant ACCEPT_MAX => 1000;
sub reset_num_accept {
$NUMACCEPT = 20;
}
sub help {
print <<EOT;
Usage:
qpsmtpd [OPTIONS]
Options:
-l, --listen-address addr : listen on a specific address; default 0.0.0.0
-p, --port P : listen on a specific port; default 2525
--config-port : config server port; default 20025
-u, --user U : run as a particular user; defualt 'smtpd'
-j, --procs J : spawn J processes; default 1
-d, --detach : detach from controlling terminal (daemonize)
--pid-file P : print main servers PID to file P
-h, --help : this page
--use-poll : force use of poll() instead of epoll()/kqueue()
EOT
exit(0);
}
GetOptions(
'p|port=i' => \$PORT,
'l|listen-address=s' => \$LOCALADDR,
'j|procs=i' => \$PROCS,
'v|verbose+' => \$DEBUG,
'u|user=s' => \$USER,
'pid-file=s' => \$PID_FILE,
'd|detach' => \$DETACH,
'h|help' => \&help,
'config-port=i' => \$CONFIG_PORT,
)
|| help();
# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 }
else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 }
else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 }
else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 }
else { &help }
sub force_poll {
$Danga::Socket::HaveEpoll = 0;
$Danga::Socket::HaveKQueue = 0;
}
my $POLL = "with "
. (
$Danga::Socket::HaveEpoll ? "epoll()"
: $Danga::Socket::HaveKQueue ? "kqueue()"
: "poll()"
);
my $SERVER;
my $CONFIG_SERVER;
use constant READY => 1;
use constant ACCEPTING => 2;
use constant RESTARTING => 999;
my %childstatus = ();
if ($PID_FILE && -r $PID_FILE) {
open PID, "<$PID_FILE"
or die "open_pidfile $PID_FILE: $!\n";
my $running_pid = <PID> || '';
chomp $running_pid;
if ($running_pid =~ /^(\d+)/) {
if (kill 0, $running_pid) {
die "Found an already running qpsmtpd with pid $running_pid.\n";
}
}
close(PID);
}
run_as_server();
exit(0);
sub _fork {
my $pid = fork;
if (!defined($pid)) { die "Cannot fork: $!" }
return $pid if $pid;
# Fixup Net::DNS randomness after fork
srand($$ ^ time);
local $^W;
delete $INC{'Net/DNS/Header.pm'};
require Net::DNS::Header;
# cope with different versions of Net::DNS
eval {
$Net::DNS::Resolver::global{id} = 1;
$Net::DNS::Resolver::global{id} =
int(rand(Net::DNS::Resolver::MAX_ID()));
# print "Next DNS ID: $Net::DNS::Resolver::global{id}\n";
};
if ($@) {
# print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n";
}
# Fixup lost kqueue after fork
$Danga::Socket::HaveKQueue = undef;
}
sub spawn_child {
my $plugin_loader = shift || Qpsmtpd::SMTP->new;
socketpair(my $reader, my $writer, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
|| die "Unable to create a pipe";
$writer->autoflush(1);
$reader->autoflush(1);
if (my $pid = _fork) {
$childstatus{$pid} = $writer;
return $pid;
}
$SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
$SIG{PIPE} = 'IGNORE';
$SIG{HUP} = 'IGNORE';
close $CONFIG_SERVER;
Qpsmtpd::PollServer->Reset;
Qpsmtpd::PollServer->OtherFds(
fileno($reader) => sub { command_handler($reader) },
fileno($SERVER) => \&accept_handler,);
$ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
$plugin_loader->run_hooks('post-fork');
Qpsmtpd::PollServer->EventLoop();
exit;
}
# Note this is broken on KQueue because it requires that it handle signals itself or it breaks the event loop.
sub sig_hup {
for my $writer (values %childstatus) {
print $writer "hup\n";
}
}
sub sig_chld {
my $spawn_count = 0;
while ((my $child = waitpid(-1, WNOHANG)) > 0) {
if (!defined $childstatus{$child}) {
next;
}
last unless $child > 0;
print "SIGCHLD: child $child died\n";
delete $childstatus{$child};
$spawn_count++;
}
if ($spawn_count) {
for (1 .. $spawn_count) {
# restart a new child if in poll server mode
my $pid = spawn_child();
}
}
$SIG{CHLD} = \&sig_chld;
}
sub HUNTSMAN {
$SIG{CHLD} = 'DEFAULT';
kill 'INT' => keys %childstatus;
if ($PID_FILE && -e $PID_FILE) {
unlink $PID_FILE or ::log(LOGERROR, "unlink: $PID_FILE: $!");
}
exit(0);
}
sub run_as_server {
# establish SERVER socket, bind and listen.
$SERVER = IO::Socket::INET->new(
LocalPort => $PORT,
LocalAddr => $LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => SOMAXCONN
)
or die "Error creating server $LOCALADDR:$PORT : $@\n";
IO::Handle::blocking($SERVER, 0);
binmode($SERVER, ':raw');
$CONFIG_SERVER =
IO::Socket::INET->new(
LocalPort => $CONFIG_PORT,
LocalAddr => $CONFIG_LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => 1
)
or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n";
IO::Handle::blocking($CONFIG_SERVER, 0);
binmode($CONFIG_SERVER, ':raw');
# Drop priviledges
my (undef, undef, $quid, $qgid) = getpwnam $USER
or die "unable to determine uid/gid for $USER\n";
my $groups = "$qgid $qgid";
while (my (undef, undef, $gid, $members) = getgrent) {
my @m = split(/ /, $members);
if (grep { $_ eq $USER } @m) {
$groups .= " $gid";
}
}
endgrent;
$) = $groups;
POSIX::setgid($qgid)
or die "unable to change gid: $!\n";
POSIX::setuid($quid)
or die "unable to change uid: $!\n";
$> = $quid;
# Load plugins here
my $plugin_loader = Qpsmtpd::SMTP->new();
$plugin_loader->load_plugins;
if ($DETACH) {
open STDIN, '/dev/null' or die "/dev/null: $!";
open STDOUT, '>/dev/null' or die "/dev/null: $!";
open STDERR, '>&STDOUT' or die "open(stderr): $!";
defined(my $pid = fork) or die "fork: $!";
exit 0 if $pid;
POSIX::setsid or die "setsid: $!";
}
if ($PID_FILE) {
open PID, ">$PID_FILE" || die "$PID_FILE: $!";
print PID $$, "\n";
close PID;
}
$plugin_loader->log(LOGINFO,
'Running as user '
. (getpwuid($>) || $>)
. ', group '
. (getgrgid($)) || $))
);
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
######################
# more Profiling code
=pod
$plugin_loader->run_hooks('post-fork');
Devel::Profiler->set_options(
bad_subs => [qw(Danga::Socket::EventLoop)],
sub_filter => sub {
my ($pkg, $sub) = @_;
return 0 if $sub eq 'AUTOLOAD';
return 0 if $pkg =~ /ParaDNS::XS/;
return 1;
},
);
Devel::Profiler->init();
Qpsmtpd::PollServer->OtherFds(
fileno($SERVER) => \&accept_handler,
fileno($CONFIG_SERVER) => \&config_handler, );
Qpsmtpd::PollServer->EventLoop;
exit;
=cut
#####################
for (1 .. $PROCS) {
my $pid = spawn_child($plugin_loader);
}
$plugin_loader->log(LOGDEBUG,
"Listening on $PORT with $PROCS children $POLL");
$SIG{CHLD} = \&sig_chld;
$SIG{HUP} = \&sig_hup;
Qpsmtpd::PollServer->OtherFds(fileno($CONFIG_SERVER) => \&config_handler,);
Qpsmtpd::PollServer->EventLoop;
exit;
}
sub config_handler {
my $csock = $CONFIG_SERVER->accept();
if (!$csock) {
# warn("accept failed on config server: $!");
return;
}
binmode($csock, ':raw');
printf("Config server connection\n") if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
my $client = Qpsmtpd::ConfigServer->new($csock);
$client->watch_read(1);
return;
}
sub command_handler {
my $reader = shift;
chomp(my $command = <$reader>);
#print "Got command: $command\n";
my $real_command = "cmd_$command";
no strict 'refs';
$real_command->();
}
sub cmd_hup {
# clear cache
print "Clearing cache\n";
Qpsmtpd::clear_config_cache();
# should also reload modules... but can't do that yet.
}
# Accept all new connections
sub accept_handler {
for (1 .. $NUMACCEPT) {
return unless _accept_handler();
}
# got here because we have accept's left.
# So double the number we accept next time.
$NUMACCEPT *= 2;
$NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX;
$ACCEPT_RSET->cancel if defined $ACCEPT_RSET;
$ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
}
use Errno qw(EAGAIN EWOULDBLOCK);
sub _accept_handler {
my $csock = $SERVER->accept();
if (!$csock) {
# warn("accept() failed: $!");
return;
}
binmode($csock, ':raw');
printf("Listen child making a Qpsmtpd::PollServer for %d.\n",
fileno($csock))
if $DEBUG;
IO::Handle::blocking($csock, 0);
#setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
#print "Got connection\n";
my $client = Qpsmtpd::PollServer->new($csock);
if ($PAUSED) {
$client->write("451 Sorry, this server is currently paused\r\n");
$client->close;
return 1;
}
$client->process_line("Connect\n");
$client->watch_read(1);
$client->pause_read();
return 1;
}
########################################################################
sub log {
my ($level, $message) = @_;
# $level not used yet. this is reimplemented from elsewhere anyway
warn("$$ fd:? $message\n");
}
sub pause {
my ($pause) = @_;
$PAUSED = $pause;
}