#!/usr/bin/perl use lib "./lib"; BEGIN { delete $ENV{ENV}; delete $ENV{BASH_ENV}; $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin'; } # Profiling - requires Devel::Profiler 0.05 #BEGIN { $Devel::Profiler::NO_INIT = 1; } #use Devel::Profiler; use strict; use vars qw($DEBUG); use FindBin qw(); # TODO: need to make this taint friendly use lib "$FindBin::Bin/lib"; use Danga::Socket; use Danga::Client; use Qpsmtpd::PollServer; use Qpsmtpd::ConfigServer; use Qpsmtpd::Constants; use IO::Socket; use Carp; use POSIX qw(WNOHANG); use Getopt::Long; use List::Util qw(shuffle); $|++; use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET AF_UNIX SOCK_STREAM PF_UNSPEC); $SIG{'PIPE'} = "IGNORE"; # handled manually $DEBUG = 0; my $CONFIG_PORT = 20025; my $CONFIG_LOCALADDR = '127.0.0.1'; my $PORT = 2525; my $LOCALADDR = '0.0.0.0'; my $PROCS = 1; my $USER = (getpwuid $>)[0]; # user to suid to $USER = "smtpd" if $USER eq "root"; my $PAUSED = 0; my $NUMACCEPT = 20; my $PID_FILE = ''; my $ACCEPT_RSET; my $DETACH; # daemonize on startup # make sure we don't spend forever doing accept() use constant ACCEPT_MAX => 1000; sub reset_num_accept { $NUMACCEPT = 20; } sub help { print <<EOT; Usage: qpsmtpd [OPTIONS] Options: -l, --listen-address addr : listen on a specific address; default 0.0.0.0 -p, --port P : listen on a specific port; default 2525 --config-port : config server port; default 20025 -u, --user U : run as a particular user; defualt 'smtpd' -j, --procs J : spawn J processes; default 1 -d, --detach : detach from controlling terminal (daemonize) --pid-file P : print main servers PID to file P -h, --help : this page --use-poll : force use of poll() instead of epoll()/kqueue() EOT exit(0); } GetOptions( 'p|port=i' => \$PORT, 'l|listen-address=s' => \$LOCALADDR, 'j|procs=i' => \$PROCS, 'v|verbose+' => \$DEBUG, 'u|user=s' => \$USER, 'pid-file=s' => \$PID_FILE, 'd|detach' => \$DETACH, 'h|help' => \&help, 'config-port=i' => \$CONFIG_PORT, ) || help(); # detaint the commandline if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help } if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help } if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help } if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help } sub force_poll { $Danga::Socket::HaveEpoll = 0; $Danga::Socket::HaveKQueue = 0; } my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()"); my $SERVER; my $CONFIG_SERVER; use constant READY => 1; use constant ACCEPTING => 2; use constant RESTARTING => 999; my %childstatus = (); if ($PID_FILE && -r $PID_FILE) { open PID, "<$PID_FILE" or die "open_pidfile $PID_FILE: $!\n"; my $running_pid = <PID> || ''; chomp $running_pid; if ($running_pid =~ /^(\d+)/) { if (kill 0, $running_pid) { die "Found an already running qpsmtpd with pid $running_pid.\n"; } } close(PID); } run_as_server(); exit(0); sub _fork { my $pid = fork; if (!defined($pid)) { die "Cannot fork: $!" } return $pid if $pid; # Fixup Net::DNS randomness after fork srand($$ ^ time); local $^W; delete $INC{'Net/DNS/Header.pm'}; require Net::DNS::Header; # cope with different versions of Net::DNS eval { $Net::DNS::Resolver::global{id} = 1; $Net::DNS::Resolver::global{id} = int(rand(Net::DNS::Resolver::MAX_ID())); # print "Next DNS ID: $Net::DNS::Resolver::global{id}\n"; }; if ($@) { # print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n"; } # Fixup lost kqueue after fork $Danga::Socket::HaveKQueue = undef; } sub spawn_child { my $plugin_loader = shift || Qpsmtpd::SMTP->new; socketpair(my $reader, my $writer, AF_UNIX, SOCK_STREAM, PF_UNSPEC) || die "Unable to create a pipe"; $writer->autoflush(1); $reader->autoflush(1); if (my $pid = _fork) { $childstatus{$pid} = $writer; return $pid; } $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT'; $SIG{PIPE} = 'IGNORE'; $SIG{HUP} = 'IGNORE'; close $CONFIG_SERVER; Qpsmtpd::PollServer->Reset; Qpsmtpd::PollServer->OtherFds( fileno($reader) => sub { command_handler($reader) }, fileno($SERVER) => \&accept_handler, ); $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept); $plugin_loader->run_hooks('post-fork'); Qpsmtpd::PollServer->EventLoop(); exit; } # Note this is broken on KQueue because it requires that it handle signals itself or it breaks the event loop. sub sig_hup { for my $writer (values %childstatus) { print $writer "hup\n"; } } sub sig_chld { my $spawn_count = 0; while ( (my $child = waitpid(-1,WNOHANG)) > 0) { if (!defined $childstatus{$child}) { next; } last unless $child > 0; print "SIGCHLD: child $child died\n"; delete $childstatus{$child}; $spawn_count++; } if ($spawn_count) { for (1..$spawn_count) { # restart a new child if in poll server mode my $pid = spawn_child(); } } $SIG{CHLD} = \&sig_chld; } sub HUNTSMAN { $SIG{CHLD} = 'DEFAULT'; kill 'INT' => keys %childstatus; if ($PID_FILE && -e $PID_FILE) { unlink $PID_FILE or ::log(LOGERROR, "unlink: $PID_FILE: $!"); } exit(0); } sub run_as_server { # establish SERVER socket, bind and listen. $SERVER = IO::Socket::INET->new(LocalPort => $PORT, LocalAddr => $LOCALADDR, Type => SOCK_STREAM, Proto => IPPROTO_TCP, Blocking => 0, Reuse => 1, Listen => SOMAXCONN ) or die "Error creating server $LOCALADDR:$PORT : $@\n"; IO::Handle::blocking($SERVER, 0); binmode($SERVER, ':raw'); $CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT, LocalAddr => $CONFIG_LOCALADDR, Type => SOCK_STREAM, Proto => IPPROTO_TCP, Blocking => 0, Reuse => 1, Listen => 1 ) or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n"; IO::Handle::blocking($CONFIG_SERVER, 0); binmode($CONFIG_SERVER, ':raw'); # Drop priviledges my (undef, undef, $quid, $qgid) = getpwnam $USER or die "unable to determine uid/gid for $USER\n"; my $groups = "$qgid $qgid"; while (my (undef, undef, $gid, $members) = getgrent) { my @m = split(/ /, $members); if (grep { $_ eq $USER } @m) { $groups .= " $gid"; } } endgrent; $) = $groups; POSIX::setgid($qgid) or die "unable to change gid: $!\n"; POSIX::setuid($quid) or die "unable to change uid: $!\n"; $> = $quid; # Load plugins here my $plugin_loader = Qpsmtpd::SMTP->new(); $plugin_loader->load_plugins; if ($DETACH) { open STDIN, '/dev/null' or die "/dev/null: $!"; open STDOUT, '>/dev/null' or die "/dev/null: $!"; open STDERR, '>&STDOUT' or die "open(stderr): $!"; defined (my $pid = fork) or die "fork: $!"; exit 0 if $pid; POSIX::setsid or die "setsid: $!"; } if ($PID_FILE) { open PID, ">$PID_FILE" || die "$PID_FILE: $!"; print PID $$,"\n"; close PID; } $plugin_loader->log(LOGINFO, 'Running as user '. (getpwuid($>) || $>) . ', group '. (getgrgid($)) || $))); $SIG{INT} = $SIG{TERM} = \&HUNTSMAN; ###################### # more Profiling code =pod $plugin_loader->run_hooks('post-fork'); Devel::Profiler->set_options( bad_subs => [qw(Danga::Socket::EventLoop)], sub_filter => sub { my ($pkg, $sub) = @_; return 0 if $sub eq 'AUTOLOAD'; return 0 if $pkg =~ /ParaDNS::XS/; return 1; }, ); Devel::Profiler->init(); Qpsmtpd::PollServer->OtherFds( fileno($SERVER) => \&accept_handler, fileno($CONFIG_SERVER) => \&config_handler, ); Qpsmtpd::PollServer->EventLoop; exit; =cut ##################### for (1..$PROCS) { my $pid = spawn_child($plugin_loader); } $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children $POLL"); $SIG{CHLD} = \&sig_chld; $SIG{HUP} = \&sig_hup; Qpsmtpd::PollServer->OtherFds( fileno($CONFIG_SERVER) => \&config_handler, ); Qpsmtpd::PollServer->EventLoop; exit; } sub config_handler { my $csock = $CONFIG_SERVER->accept(); if (!$csock) { # warn("accept failed on config server: $!"); return; } binmode($csock, ':raw'); printf("Config server connection\n") if $DEBUG; IO::Handle::blocking($csock, 0); setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; my $client = Qpsmtpd::ConfigServer->new($csock); $client->watch_read(1); return; } sub command_handler { my $reader = shift; chomp(my $command = <$reader>); #print "Got command: $command\n"; my $real_command = "cmd_$command"; no strict 'refs'; $real_command->(); } sub cmd_hup { # clear cache print "Clearing cache\n"; Qpsmtpd::clear_config_cache(); # should also reload modules... but can't do that yet. } # Accept all new connections sub accept_handler { for (1 .. $NUMACCEPT) { return unless _accept_handler(); } # got here because we have accept's left. # So double the number we accept next time. $NUMACCEPT *= 2; $NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX; $ACCEPT_RSET->cancel if defined $ACCEPT_RSET; $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept); } use Errno qw(EAGAIN EWOULDBLOCK); sub _accept_handler { my $csock = $SERVER->accept(); if (!$csock) { # warn("accept() failed: $!"); return; } binmode($csock, ':raw'); printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock)) if $DEBUG; IO::Handle::blocking($csock, 0); #setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; #print "Got connection\n"; my $client = Qpsmtpd::PollServer->new($csock); if ($PAUSED) { $client->write("451 Sorry, this server is currently paused\r\n"); $client->close; return 1; } $client->process_line("Connect\n"); $client->watch_read(1); $client->pause_read(); return 1; } ######################################################################## sub log { my ($level,$message) = @_; # $level not used yet. this is reimplemented from elsewhere anyway warn("$$ fd:? $message\n"); } sub pause { my ($pause) = @_; $PAUSED = $pause; }