diff --git a/lib/Danga/Socket.pm b/lib/Danga/Socket.pm index e94220f..dfaf785 100644 --- a/lib/Danga/Socket.pm +++ b/lib/Danga/Socket.pm @@ -193,16 +193,16 @@ sub KQueueEventLoop { next; } - push @objs, [$pob, $fd, $filter, $flags, $fflags]; + DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", + $fd, ref($pob), $flags, time); + + push @objs, [$pob, $filter, $flags, $fflags]; } # TODO - prioritize the objects foreach (@objs) { - my ($pob, $fd, $filter, $flags, $fflags) = @$_; - - DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", - $fd, ref($pob), $flags, time); + my ($pob, $filter, $flags, $fflags) = @$_; $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed}; $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed}; @@ -236,6 +236,7 @@ sub EpollEventLoop { my $evcount; # get up to 1000 events, 1000ms timeout while ($evcount = epoll_wait($Epoll, 1000, 1000, \@events)) { + my @objs; EVENT: for ($i=0; $i<$evcount; $i++) { my $ev = $events[$i]; @@ -260,11 +261,17 @@ sub EpollEventLoop { DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", $ev->[0], ref($pob), $ev->[1], time); + push @objs, [$pob, $state]; + } + + foreach (@objs) { + my ($pob, $state) = @$_; $pob->event_read if $state & EPOLLIN && ! $pob->{closed}; $pob->event_write if $state & EPOLLOUT && ! $pob->{closed}; $pob->event_err if $state & EPOLLERR && ! $pob->{closed}; $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed}; } + return unless PostEventLoop(); } diff --git a/lib/Qpsmtpd/Plugin.pm b/lib/Qpsmtpd/Plugin.pm index 84482ce..25836a4 100644 --- a/lib/Qpsmtpd/Plugin.pm +++ b/lib/Qpsmtpd/Plugin.pm @@ -53,6 +53,10 @@ sub connection { shift->qp->connection; } +sub config { + shift->qp->config(@_); +} + sub spool_dir { shift->qp->spool_dir; } diff --git a/lib/Qpsmtpd/PollServer.pm b/lib/Qpsmtpd/PollServer.pm new file mode 100644 index 0000000..73429a2 --- /dev/null +++ b/lib/Qpsmtpd/PollServer.pm @@ -0,0 +1,332 @@ +# $Id: Server.pm,v 1.10 2005/02/14 22:04:48 msergeant Exp $ + +package Qpsmtpd::PollServer; + +use base ('Danga::Client', 'Qpsmtpd::SMTP'); +# use fields required to be a subclass of Danga::Client. Have to include +# all fields used by Qpsmtpd.pm here too. +use fields qw( + input_sock + mode + header_lines + in_header + data_size + max_size + hooks + _auth + _commands + _config_cache + _connection + _transaction + _test_mode + _extras +); +use Qpsmtpd::Constants; +use Qpsmtpd::Auth; +use Qpsmtpd::Address; +use Danga::DNS; +use Mail::Header; +use POSIX qw(strftime); +use Socket qw(inet_aton AF_INET CRLF); + +sub input_sock { + my $self = shift; + @_ and $self->{input_sock} = shift; + $self->{input_sock} || $self; +} + +sub new { + my Qpsmtpd::PollServer $self = shift; + + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + $self->load_plugins; + return $self; +} + +sub reset_for_next_message { + my $self = shift; + $self->SUPER::reset_for_next_message(@_); + + $self->{_commands} = { + ehlo => 1, + helo => 1, + rset => 1, + mail => 1, + rcpt => 1, + data => 1, + help => 1, + vrfy => 1, + noop => 1, + quit => 1, + auth => 0, # disabled by default + }; + $self->{mode} = 'cmd'; + $self->{_extras} = {}; +} + +sub respond { + my $self = shift; + my ($code, @messages) = @_; + while (my $msg = shift @messages) { + my $line = $code . (@messages ? "-" : " ") . $msg; + $self->write("$line\r\n"); + } + return 1; +} + +sub process_line { + my $self = shift; + my $line = shift || return; + if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } + local $SIG{ALRM} = sub { + my ($pkg, $file, $line) = caller(); + die "ALARM: $pkg, $file, $line"; + }; + my $prev = alarm(2); # must process a command in < 2 seconds + eval { $self->_process_line($line) }; + alarm($prev); + if ($@) { + print STDERR "Error: $@\n"; + return $self->fault("command failed unexpectedly") if $self->{mode} eq 'cmd'; + return $self->fault("error processing data lines") if $self->{mode} eq 'data'; + return $self->fault("unknown error"); + } + return; +} + +sub _process_line { + my $self = shift; + my $line = shift; + + if ($self->{mode} eq 'cmd') { + $line =~ s/\r?\n//; + return $self->process_cmd($line); + } + elsif ($self->{mode} eq 'data') { + return $self->data_line($line); + } + else { + die "Unknown mode"; + } +} + +sub process_cmd { + my $self = shift; + my $line = shift; + my ($cmd, @params) = split(/ +/, $line); + my $meth = lc($cmd); + if (my $lookup = $self->{_commands}->{$meth} && $self->can($meth)) { + my $resp = eval { + $lookup->($self, @params); + }; + if ($@) { + my $error = $@; + chomp($error); + $self->log(LOGERROR, "Command Error: $error"); + return $self->fault("command '$cmd' failed unexpectedly"); + } + return $resp; + } + else { + # No such method - i.e. unrecognized command + my ($rc, $msg) = $self->run_hooks("unrecognized_command", $cmd); + if ($rc == DENY) { + $self->respond(521, $msg); + $self->disconnect; + return; + } + elsif ($rc == DONE) { + return; # TODO - this isn't right. + } + else { + return $self->respond(500, "Unrecognized command"); + } + } +} + +sub disconnect { + my $self = shift; + $self->SUPER::disconnect(@_); + $self->close; +} + +sub start_conversation { + my $self = shift; + + my $conn = $self->connection; + # set remote_host, remote_ip and remote_port + my ($ip, $port) = split(':', $self->peer_addr_string); + $conn->remote_ip($ip); + $conn->remote_port($port); + Danga::DNS->new( + client => $self, + # NB: Setting remote_info to the same as remote_host + callback => sub { $conn->remote_info($conn->remote_host($_[0])) }, + host => $ip, + ); + + my ($rc, $msg) = $self->run_hooks("connect"); + if ($rc == DENY) { + $self->respond(550, ($msg || 'Connection from you denied, bye bye.')); + return $rc; + } + elsif ($rc == DENYSOFT) { + $self->respond(450, ($msg || 'Connection from you temporarily denied, bye bye.')); + return $rc; + } + elsif ($rc == DONE) { + $self->respond(220, $msg); + return $rc; + } + else { + $self->respond(220, $self->config('me') ." ESMTP qpsmtpd " + . $self->version ." ready; send us your mail, but not your spam."); + return DONE; + } +} + +sub data { + my $self = shift; + + my ($rc, $msg) = $self->run_hooks("data"); + if ($rc == DONE) { + return; + } + elsif ($rc == DENY) { + $self->respond(554, $msg || "Message denied"); + $self->reset_transaction(); + return; + } + elsif ($rc == DENYSOFT) { + $self->respond(451, $msg || "Message denied temporarily"); + $self->reset_transaction(); + return; + } + elsif ($rc == DENY_DISCONNECT) { + $self->respond(554, $msg || "Message denied"); + $self->disconnect; + return; + } + elsif ($rc == DENYSOFT_DISCONNECT) { + $self->respond(451, $msg || "Message denied temporarily"); + $self->disconnect; + return; + } + return $self->respond(503, "MAIL first") unless $self->transaction->sender; + return $self->respond(503, "RCPT first") unless $self->transaction->recipients; + + $self->{mode} = 'data'; + + $self->{header_lines} = []; + $self->{data_size} = 0; + $self->{in_header} = 1; + $self->{max_size} = ($self->config('databytes'))[0] || 0; # this should work in scalar context + + $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}"); + + return $self->respond(354, "go ahead"); +} + +sub data_line { + my $self = shift; + + my $line = shift; + + if ($line eq ".\r\n") { + # add received etc. + $self->{mode} = 'cmd'; + $self->end_of_data; + return; + } + + # Reject messages that have either bare LF or CR. rjkaes noticed a + # lot of spam that is malformed in the header. + if ($line eq ".\n" or $line eq ".\r") { + $self->respond(421, "See http://smtpd.develooper.com/barelf.html"); + $self->disconnect; + return; + } + + # add a transaction->blocked check back here when we have line by line plugin access... + unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) { + $line =~ s/\r\n$/\n/; + $line =~ s/^\.\./\./; + + if ($self->{in_header} and $line =~ m/^\s*$/) { + # end of headers + $self->{in_header} = 0; + + # ... need to check that we don't reformat any of the received lines. + # + # 3.8.2 Received Lines in Gatewaying + # When forwarding a message into or out of the Internet environment, a + # gateway MUST prepend a Received: line, but it MUST NOT alter in any + # way a Received: line that is already in the header. + + my $header = Mail::Header->new($self->{header_lines}, + Modify => 0, MailFrom => "COERCE"); + $self->transaction->header($header); + + #$header->add("X-SMTPD", "qpsmtpd/".$self->version.", http://smtpd.develooper.com/"); + + # FIXME - call plugins to work on just the header here; can + # save us buffering the mail content. + } + + if ($self->{in_header}) { + push @{ $self->{header_lines} }, $line; + } + else { + $self->transaction->body_write($line); + } + + $self->{data_size} += length $line; + } + + return; +} + +sub end_of_data { + my $self = shift; + + #$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300); + + $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $size"); + + my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP"; + + # only true if client authenticated + if ( defined $self->{_auth} and $self->{_auth} == OK ) { + $header->add("X-Qpsmtpd-Auth","True"); + } + + $self->transaction->header->add("Received", "from ".$self->connection->remote_info + ." (HELO ".$self->connection->hello_host . ") (".$self->connection->remote_ip + . ")\n by ".$self->config('me')." (qpsmtpd/".$self->version + .") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', localtime)), + 0); + + return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size}; + + ($rc, $msg) = $self->run_hooks("data_post"); + if ($rc == DONE) { + return; + } + elsif ($rc == DENY) { + $self->respond(552, $msg || "Message denied"); + } + elsif ($rc == DENYSOFT) { + $self->respond(452, $msg || "Message denied temporarily"); + } + else { + $self->queue($self->transaction); + } + + # DATA is always the end of a "transaction" + $self->reset_transaction; + return; +} + +1; + diff --git a/lib/Qpsmtpd/SelectServer.pm b/lib/Qpsmtpd/SelectServer.pm deleted file mode 100644 index 07e5c56..0000000 --- a/lib/Qpsmtpd/SelectServer.pm +++ /dev/null @@ -1,320 +0,0 @@ -package Qpsmtpd::SelectServer; -use Qpsmtpd::SMTP; -use Qpsmtpd::Constants; -use IO::Socket; -use IO::Select; -use POSIX qw(strftime); -use Socket qw(CRLF); -use Fcntl; -use Tie::RefHash; -use Net::DNS; - -@ISA = qw(Qpsmtpd::SMTP); -use strict; - -our %inbuffer = (); -our %outbuffer = (); -our %ready = (); -our %lookup = (); -our %qp = (); -our %indata = (); - -tie %ready, 'Tie::RefHash'; -my $server; -my $select; - -our $QUIT = 0; - -$SIG{INT} = $SIG{TERM} = sub { $QUIT++ }; - -sub log { - my ($self, $trace, @log) = @_; - my $level = Qpsmtpd::TRACE_LEVEL(); - $level = $self->init_logger unless defined $level; - warn join(" ", fileno($self->client), @log), "\n" - if $trace <= $level; -} - -sub main { - my $class = shift; - my %opts = (LocalPort => 25, Reuse => 1, Listen => SOMAXCONN, @_); - $server = IO::Socket::INET->new(%opts) or die "Server: $@"; - print "Listening on $opts{LocalPort}\n"; - - nonblock($server); - - $select = IO::Select->new($server); - my $res = Net::DNS::Resolver->new; - - # TODO - make this more graceful - let all current SMTP sessions finish - # before quitting! - while (!$QUIT) { - foreach my $client ($select->can_read(1)) { - #print "Reading $client\n"; - if ($client == $server) { - my $client_addr; - $client = $server->accept(); - next unless $client; - my $ip = $client->peerhost; - my $bgsock = $res->bgsend($ip); - $select->add($bgsock); - $lookup{$bgsock} = $client; - } - elsif (my $qpclient = $lookup{$client}) { - my $packet = $res->bgread($client); - my $ip = $qpclient->peerhost; - my $hostname = $ip; - if ($packet) { - foreach my $rr ($packet->answer) { - if ($rr->type eq 'PTR') { - $hostname = $rr->rdatastr; - } - } - } - # $packet->print; - $select->remove($client); - delete($lookup{$client}); - my $qp = Qpsmtpd::SelectServer->new(); - $qp->client($qpclient); - $qp{$qpclient} = $qp; - $qp->log(LOGINFO, "Connection number " . keys(%qp)); - $inbuffer{$qpclient} = ''; - $outbuffer{$qpclient} = ''; - $ready{$qpclient} = []; - $qp->start_connection($ip, $hostname); - $qp->load_plugins; - my $rc = $qp->start_conversation; - if ($rc != DONE) { - close($client); - next; - } - $select->add($qpclient); - nonblock($qpclient); - } - else { - my $data = ''; - my $rv = $client->recv($data, POSIX::BUFSIZ(), 0); - - unless (defined($rv) && length($data)) { - freeclient($client) - unless ($! == POSIX::EWOULDBLOCK() || - $! == POSIX::EINPROGRESS() || - $! == POSIX::EINTR()); - next; - } - $inbuffer{$client} .= $data; - - while ($inbuffer{$client} =~ s/^([^\r\n]*)\r?\n//) { - #print "<$1\n"; - push @{$ready{$client}}, $1; - } - } - } - - #print "Processing...\n"; - foreach my $client (keys %ready) { - my $qp = $qp{$client}; - #print "Processing $client = $qp\n"; - foreach my $req (@{$ready{$client}}) { - if ($indata{$client}) { - $qp->data_line($req . CRLF); - } - else { - $qp->log(LOGINFO, "dispatching $req"); - defined $qp->dispatch(split / +/, $req) - or $qp->respond(502, "command unrecognized: '$req'"); - } - } - delete $ready{$client}; - } - - #print "Writing...\n"; - foreach my $client ($select->can_write(1)) { - next unless $outbuffer{$client}; - #print "Writing to $client\n"; - - my $rv = $client->send($outbuffer{$client}, 0); - unless (defined($rv)) { - warn("I was told to write, but I can't: $!\n"); - next; - } - if ($rv == length($outbuffer{$client}) || - $! == POSIX::EWOULDBLOCK()) - { - #print "Sent all, or EWOULDBLOCK\n"; - if ($qp{$client}->{__quitting}) { - freeclient($client); - next; - } - substr($outbuffer{$client}, 0, $rv, ''); - delete($outbuffer{$client}) unless length($outbuffer{$client}); - } - else { - print "Error: $!\n"; - # Couldn't write all the data, and it wasn't because - # it would have blocked. Shut down and move on. - freeclient($client); - next; - } - } - } -} - -sub freeclient { - my $client = shift; - #print "Freeing client: $client\n"; - delete $inbuffer{$client}; - delete $outbuffer{$client}; - delete $ready{$client}; - delete $qp{$client}; - $select->remove($client); - close($client); -} - -sub start_connection { - my $self = shift; - my $remote_ip = shift; - my $remote_host = shift; - - $self->log(LOGNOTICE, "Connection from $remote_host [$remote_ip]"); - my $remote_info = 'NOINFO'; - - # if the local dns resolver doesn't filter it out we might get - # ansi escape characters that could make a ps axw do "funny" - # things. So to be safe, cut them out. - $remote_host =~ tr/a-zA-Z\.\-0-9//cd; - - $self->SUPER::connection->start(remote_info => $remote_info, - remote_ip => $remote_ip, - remote_host => $remote_host, - @_); -} - -sub client { - my $self = shift; - @_ and $self->{_client} = shift; - $self->{_client}; -} - -sub nonblock { - my $socket = shift; - my $flags = fcntl($socket, F_GETFL, 0) - or die "Can't get flags for socket: $!"; - fcntl($socket, F_SETFL, $flags | O_NONBLOCK) - or die "Can't set flags for socket: $!"; -} - -sub read_input { - my $self = shift; - die "read_input is disabled in SelectServer"; -} - -sub respond { - my ($self, $code, @messages) = @_; - my $client = $self->client || die "No client!"; - while (my $msg = shift @messages) { - my $line = $code . (@messages?"-":" ").$msg; - $self->log(LOGINFO, ">$line"); - $outbuffer{$client} .= "$line\r\n"; - } - return 1; -} - -sub disconnect { - my $self = shift; - #print "Disconnecting\n"; - $self->{__quitting} = 1; - $self->SUPER::disconnect(@_); -} - -sub data { - my $self = shift; - $self->respond(503, "MAIL first"), return 1 unless $self->transaction->sender; - $self->respond(503, "RCPT first"), return 1 unless $self->transaction->recipients; - $self->respond(354, "go ahead"); - $indata{$self->client()} = 1; - $self->{__buffer} = ''; - $self->{__size} = 0; - $self->{__blocked} = ""; - $self->{__in_header} = 1; - $self->{__complete} = 0; - $self->{__max_size} = $self->config('databytes') || 0; -} - -sub data_line { - my $self = shift; - local $_ = shift; - - if ($_ eq ".\r\n") { - $self->log(LOGDEBUG, "max_size: $self->{__max_size} / size: $self->{__size}"); - delete $indata{$self->client()}; - - my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP"; - - if (!$self->transaction->header) { - $self->transaction->header(Mail::Header->new(Modify => 0, MailFrom => "COERCE")); - } - $self->transaction->header->add("Received", "from ".$self->connection->remote_info - ." (HELO ".$self->connection->hello_host . ") (".$self->connection->remote_ip - . ") by ".$self->config('me')." (qpsmtpd/".$self->version - .") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', localtime)), - 0); - - #$self->respond(550, $self->transaction->blocked),return 1 if ($self->transaction->blocked); - $self->respond(552, "Message too big!"),return 1 if $self->{__max_size} and $self->{__size} > $self->{__max_size}; - - my ($rc, $msg) = $self->run_hooks("data_post"); - if ($rc == DONE) { - return 1; - } - elsif ($rc == DENY) { - $self->respond(552, $msg || "Message denied"); - } - elsif ($rc == DENYSOFT) { - $self->respond(452, $msg || "Message denied temporarily"); - } - else { - $self->queue($self->transaction); - } - - # DATA is always the end of a "transaction" - return $self->reset_transaction; - } - elsif ($_ eq ".\n") { - $self->respond(451, "See http://develooper.com/code/qpsmtpd/barelf.html"); - $self->{__quitting} = 1; - return; - } - - # add a transaction->blocked check back here when we have line by line plugin access... - unless (($self->{__max_size} and $self->{__size} > $self->{__max_size})) { - s/\r\n$/\n/; - s/^\.\./\./; - if ($self->{__in_header} and m/^\s*$/) { - $self->{__in_header} = 0; - my @header = split /\n/, $self->{__buffer}; - - # ... need to check that we don't reformat any of the received lines. - # - # 3.8.2 Received Lines in Gatewaying - # When forwarding a message into or out of the Internet environment, a - # gateway MUST prepend a Received: line, but it MUST NOT alter in any - # way a Received: line that is already in the header. - - my $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE"); - $header->extract(\@header); - $self->transaction->header($header); - $self->{__buffer} = ""; - } - - if ($self->{__in_header}) { - $self->{__buffer} .= $_; - } - else { - $self->transaction->body_write($_); - } - $self->{__size} += length $_; - } -} - -1; diff --git a/qpsmtpd b/qpsmtpd index 254458e..5296717 100755 --- a/qpsmtpd +++ b/qpsmtpd @@ -1,30 +1,369 @@ -#!/usr/bin/perl -Tw -# Copyright (c) 2001 Ask Bjoern Hansen. See the LICENSE file for details. -# The "command dispatch" system is taken from colobus - http://trainedmonkey.com/colobus/ -# -# this is designed to be run under tcpserver (http://cr.yp.to/ucspi-tcp.html) -# or inetd if you're into that sort of thing -# -# -# For more information see http://develooper.com/code/qpsmtpd/ -# -# +#!/usr/bin/perl -w + +use lib "./lib"; +BEGIN { + delete $ENV{ENV}; + delete $ENV{BASH_ENV}; + $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin'; +} -use lib 'lib'; -use Qpsmtpd::TcpServer; use strict; -$| = 1; +use vars qw($DEBUG); +use FindBin; +use lib "$FindBin::Bin/lib"; +use Danga::Socket; +use Danga::Client; +use Qpsmtpd::PollServer; +use Qpsmtpd::Constants; +use IO::Socket; +use Carp; +use POSIX qw(WNOHANG); +use Getopt::Long; -delete $ENV{ENV}; -$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin'; +$|++; -my $qpsmtpd = Qpsmtpd::TcpServer->new(); -$qpsmtpd->start_connection(); -$qpsmtpd->run(); +# For debugging +# $SIG{USR1} = sub { Carp::confess("USR1") }; -__END__ +use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET); +$SIG{'PIPE'} = "IGNORE"; # handled manually +$DEBUG = 0; +my $PORT = 2525; +my $LOCALADDR = '0.0.0.0'; +my $LineMode = 0; +my $PROCS = 1; +my $MAXCONN = 15; # max simultaneous connections +my $USER = 'smtpd'; # user to suid to +my $MAXCONNIP = 5; # max simultaneous connections from one IP +sub help { + print < \$PORT, + 'l|listen-address=s' => \$LOCALADDR, + 'j|procs=i' => \$PROCS, + 'd|debug+' => \$DEBUG, + 'f|forkmode' => \$LineMode, + 'c|limit-connections=i' => \$MAXCONN, + 'm|max-from-ip=i' => \$MAXCONNIP, + 'u|user=s' => \$USER, + 'h|help' => \&help, +) || help(); + +# detaint the commandline +if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help } +if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help } +if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help } +if ($MAXCONN =~ /^(\d+)$/) { $MAXCONN = $1 } else { &help } +if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help } + +$PROCS = 1 if $LineMode; +# This is a bit of a hack, but we get to approximate MAXCONN stuff when we +# have multiple children listening on the same socket. +$MAXCONN /= $PROCS; +$MAXCONNIP /= $PROCS; + +Danga::Socket::init_poller(); + +my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : + $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()"); + +my $server; + +# Code for inetd/tcpserver mode +if ($ENV{REMOTE_HOST}) { + run_as_inetd(); + exit(0); +} + +my %childstatus = (); + +run_as_server(); +exit(0); + +sub _fork { + my $pid = fork; + if (!defined($pid)) { die "Cannot fork: $!" } + return $pid if $pid; + + # Fixup Net::DNS randomness after fork + srand($$ ^ time); + + local $^W; + delete $INC{'Net/DNS/Header.pm'}; + require Net::DNS::Header; + + # cope with different versions of Net::DNS + eval { + $Net::DNS::Resolver::global{id} = 1; + $Net::DNS::Resolver::global{id} = int(rand(Net::DNS::Resolver::MAX_ID())); + # print "Next DNS ID: $Net::DNS::Resolver::global{id}\n"; + }; + if ($@) { + # print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n"; + } + + # Fixup lost kqueue after fork + $Danga::Socket::HaveKQueue = undef; + Danga::Socket::init_poller(); +} + +sub spawn_child { + _fork and return; + + $SIG{CHLD} = "DEFAULT"; + + Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler); + Qpsmtpd::PollServer->EventLoop(); + exit; +} + +sub sig_chld { + $SIG{CHLD} = 'IGNORE'; + while ( (my $child = waitpid(-1,WNOHANG)) > 0) { + last unless $child > 0; + print "child $child died\n"; + delete $childstatus{$child}; + } + return if $LineMode; + # restart a new child if in poll server mode + spawn_child(); + $SIG{CHLD} = \&sig_chld; +} + +sub HUNTSMAN { + $SIG{CHLD} = 'DEFAULT'; + kill 'INT' => keys %childstatus; + exit(0); +} + +sub run_as_inetd { + $LineMode = 1; + + my $insock = IO::Handle->new_from_fd(0, "r"); + IO::Handle::blocking($insock, 0); + + my $outsock = IO::Handle->new_from_fd(1, "w"); + IO::Handle::blocking($outsock, 0); + + my $client = Danga::Client->new($insock); + + my $out = Qpsmtpd::PollServer->new($outsock); + $out->load_plugins; + $out->init_logger; + $out->input_sock($client); + my $rc = $out->start_conversation; + if ($rc != DONE) { + return; + } + + $client->watch_read(1); + while (1) { + my $line = $client->get_line; + last if !defined($line); + my $output = $out->process_line($line); + $out->write($output) if $output; + $client->watch_read(1); + } +} + +sub run_as_server { + # establish SERVER socket, bind and listen. + $server = IO::Socket::INET->new(LocalPort => $PORT, + LocalAddr => $LOCALADDR, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => 10 ) + or die "Error creating server $LOCALADDR:$PORT : $@\n"; + + IO::Handle::blocking($server, 0); + binmode($server, ':raw'); + + # Drop priviledges + my (undef, undef, $quid, $qgid) = getpwnam $USER or + die "unable to determine uid/gid for $USER\n"; + $) = ""; + POSIX::setgid($qgid) or + die "unable to change gid: $!\n"; + POSIX::setuid($quid) or + die "unable to change uid: $!\n"; + $> = $quid; + + ::log(LOGINFO, 'Running as user '. + (getpwuid($>) || $>) . + ', group '. + (getgrgid($)) || $))); + + # Load plugins here + my $plugin_loader = Qpsmtpd::SMTP->new(); + $plugin_loader->load_plugins; + + if ($PROCS > 1) { + $SIG{'CHLD'} = \&sig_chld; + my @kids; + for (1..$PROCS) { + push @kids, spawn_child(); + } + $SIG{INT} = $SIG{TERM} = sub { $SIG{CHLD} = "IGNORE"; kill 2 => @kids; exit }; + ::log(LOGDEBUG, "Listening on $PORT with $PROCS children $POLL"); + sleep while (1); + } + else { + if ($LineMode) { + $SIG{INT} = $SIG{TERM} = \&HUNTSMAN; + } + ::log(LOGDEBUG, "Listening on $PORT with single process $POLL" . + ($LineMode ? " (forking server)" : "")); + Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler); + while (1) { + Qpsmtpd::PollServer->EventLoop(); + } + exit; + } + +} + +# Accept a new connection +sub accept_handler { + my $running = scalar keys %childstatus; + while ($running >= $MAXCONN) { + ::log(LOGINFO,"Too many connections: $running >= $MAXCONN."); + return; + } + + my $csock = $server->accept(); + if (!$csock) { + # warn("accept() failed: $!"); + } + return unless $csock; + binmode($csock, ':raw'); + + printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock)) + if $DEBUG; + + IO::Handle::blocking($csock, 0); + setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + if (!$LineMode) { + # multiplex mode + my $client = Qpsmtpd::PollServer->new($csock); + my $rem_ip = $client->peer_ip_string; + + if ($MAXCONNIP) { + my $num_conn = 1; # seed with current value + + # If we for-loop directly over values %childstatus, a SIGCHLD + # can call REAPER and slip $rip out from under us. Causes + # "Use of freed value in iteration" under perl 5.8.4. + my $descriptors = Danga::Client->DescriptorMap; + my @obj = values %$descriptors; + foreach my $obj (@obj) { + local $^W; + # This is a bit of a slow way to do this. Wish I could cache the method call. + ++$num_conn if ($obj->peer_ip_string eq $rem_ip); + } + + if ($num_conn > $MAXCONNIP) { + ::log(LOGINFO,"Too many connections from $rem_ip: " + ."$num_conn > $MAXCONNIP. Denying connection."); + $client->write("451 Sorry, too many connections from $rem_ip, try again later\r\n"); + $client->close; + return; + } + } + + my $rc = $client->start_conversation; + if ($rc != DONE) { + $client->close; + return; + } + $client->watch_read(1); + return; + } + + # fork-per-connection mode + my $rem_ip = $csock->sockhost(); + + if ($MAXCONNIP) { + my $num_conn = 1; # seed with current value + + my @rip = values %childstatus; + foreach my $rip (@rip) { + ++$num_conn if (defined $rip && $rip eq $rem_ip); + } + + if ($num_conn > $MAXCONNIP) { + ::log(LOGINFO,"Too many connections from $rem_ip: " + ."$num_conn > $MAXCONNIP. Denying connection."); + print $csock "451 Sorry, too many connections from $rem_ip, try again later\r\n"; + close $csock; + return; + } + } + + if (my $pid = _fork) { + $childstatus{$pid} = $rem_ip; + return $csock->close(); + } + + $server->close(); # make sure the child doesn't accept() new connections + + $SIG{$_} = 'DEFAULT' for keys %SIG; + + my $client = Qpsmtpd::PollServer->new($csock); + my $rc = $client->start_conversation; + if ($rc != DONE) { + $client->close; + exit; + } + $client->watch_read(1); + + while (1) { + my $line = $client->get_line; + last if !defined($line); + my $resp = $client->process_line($line); + # if ($resp) { print "S: $_\n" for split(/\n/, $resp) } + $client->write($resp) if $resp; + $client->watch_read(1); + } + + ::log(LOGDEBUG, "Finished with child %d.\n", fileno($csock)) + if $DEBUG; + $client->close(); + + exit; +} + +######################################################################## + +sub log { + my ($level,$message) = @_; + # $level not used yet. this is reimplemented from elsewhere anyway + warn("$$ $message\n"); +} -1; diff --git a/qpsmtpd-forkserver b/qpsmtpd-forkserver deleted file mode 100755 index a9e8ab6..0000000 --- a/qpsmtpd-forkserver +++ /dev/null @@ -1,198 +0,0 @@ -#!/usr/bin/perl -Tw -# Copyright (c) 2001 Ask Bjoern Hansen. See the LICENSE file for details. -# The "command dispatch" system is taken from colobus - http://trainedmonkey.com/colobus/ -# -# For more information see http://develooper.com/code/qpsmtpd/ -# -# - -use lib 'lib'; -use Qpsmtpd::TcpServer; -use Qpsmtpd::Constants; -use IO::Socket; -use Socket; -use Getopt::Long; -use POSIX qw(:sys_wait_h :errno_h :signal_h); -use strict; -$| = 1; - -# Configuration -my $MAXCONN = 15; # max simultaneous connections -my $PORT = 2525; # port number -my $LOCALADDR = '0.0.0.0'; # ip address to bind to -my $USER = 'smtpd'; # user to suid to -my $MAXCONNIP = 5; # max simultaneous connections from one IP - -sub usage { - print <<"EOT"; -usage: qpsmtpd-forkserver [ options ] - -l, --listen-address addr : listen on a specific address; default 0.0.0.0 - -p, --port P : listen on a specific port; default 2525 - -c, --limit-connections N : limit concurrent connections to N; default 15 - -u, --user U : run as a particular user (defualt 'smtpd') - -m, --max-from-ip M : limit connections from a single IP; default 5 -EOT - exit 0; -} - -GetOptions('h|help' => \&usage, - 'l|listen-address=s' => \$LOCALADDR, - 'c|limit-connections=i' => \$MAXCONN, - 'm|max-from-ip=i' => \$MAXCONNIP, - 'p|port=i' => \$PORT, - 'u|user=s' => \$USER) || &usage; - -# detaint the commandline -if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &usage } -if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &usage } -if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &usage } -if ($MAXCONN =~ /^(\d+)$/) { $MAXCONN = $1 } else { &usage } - -delete $ENV{ENV}; -$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin'; - -my %childstatus = (); - -sub REAPER { - $SIG{CHLD} = \&REAPER; - while ( defined(my $chld = waitpid(-1, WNOHANG)) ){ - last unless $chld > 0; - warn("$$ cleaning up after $chld\n"); - delete $childstatus{$chld}; - } -} - -sub HUNTSMAN { - $SIG{CHLD} = 'DEFAULT'; - kill 'INT' => keys %childstatus; - exit(0); -} - -$SIG{CHLD} = \&REAPER; -$SIG{INT} = \&HUNTSMAN; -$SIG{TERM} = \&HUNTSMAN; - -# establish SERVER socket, bind and listen. -my $server = IO::Socket::INET->new(LocalPort => $PORT, - LocalAddr => $LOCALADDR, - Proto => 'tcp', - Reuse => 1, - Listen => SOMAXCONN ) - or die "Creating TCP socket $LOCALADDR:$PORT: $!\n"; -::log(LOGINFO,"Listening on port $PORT"); - -# Drop priviledges -my (undef, undef, $quid, $qgid) = getpwnam $USER or - die "unable to determine uid/gid for $USER\n"; -$) = ""; -POSIX::setgid($qgid) or - die "unable to change gid: $!\n"; -POSIX::setuid($quid) or - die "unable to change uid: $!\n"; -$> = $quid; - -::log(LOGINFO, 'Running as user '. - (getpwuid($>) || $>) . - ', group '. - (getgrgid($)) || $))); - -# Load plugins here -my $plugin_loader = Qpsmtpd::TcpServer->new(); -$plugin_loader->load_plugins; - - -while (1) { - my $running = scalar keys %childstatus; - while ($running >= $MAXCONN) { - ::log(LOGINFO,"Too many connections: $running >= $MAXCONN. Waiting one second."); - sleep(1) ; - $running = scalar keys %childstatus; - } - my $hisaddr = accept(my $client, $server); - if (!$hisaddr) { - # possible something condition... - next; - } - my ($port, $iaddr) = sockaddr_in($hisaddr); - if ($MAXCONNIP) { - my $num_conn = 1; # seed with current value - - # If we for-loop directly over values %childstatus, a SIGCHLD - # can call REAPER and slip $rip out from under us. Causes - # "Use of freed value in iteration" under perl 5.8.4. - my @rip = values %childstatus; - foreach my $rip (@rip) { - ++$num_conn if (defined $rip && $rip eq $iaddr); - } - - if ($num_conn > $MAXCONNIP) { - my $rem_ip = inet_ntoa($iaddr); - ::log(LOGINFO,"Too many connections from $rem_ip: " - ."$num_conn > $MAXCONNIP. Denying connection."); - $client->autoflush(1); - print $client "451 Sorry, too many connections from $rem_ip, try again later\r\n"; - close $client; - next; - } - } - my $pid = fork; - if ($pid) { - # parent - $childstatus{$pid} = $iaddr; # add to table - # $childstatus{$pid} = 1; # add to table - $running++; - close($client); - next; - } - die "fork: $!" unless defined $pid; # failure - # otherwise child - - # all children should have different seeds, to prevent conflicts - srand( time ^ ($$ + ($$ << 15)) ); - - close($server); - - $SIG{$_} = 'DEFAULT' for keys %SIG; - $SIG{ALRM} = sub { - print $client "421 Connection Timed Out\n"; - ::log(LOGINFO, "Connection Timed Out"); - exit; }; - - my $localsockaddr = getsockname($client); - my ($lport, $laddr) = sockaddr_in($localsockaddr); - $ENV{TCPLOCALIP} = inet_ntoa($laddr); - # my ($port, $iaddr) = sockaddr_in($hisaddr); - $ENV{TCPREMOTEIP} = inet_ntoa($iaddr); - $ENV{TCPREMOTEHOST} = gethostbyaddr($iaddr, AF_INET) || "Unknown"; - - # don't do this! - #$0 = "qpsmtpd-forkserver: $ENV{TCPREMOTEIP} / $ENV{TCPREMOTEHOST}"; - - ::log(LOGINFO, "Accepted connection $running/$MAXCONN from $ENV{TCPREMOTEIP} / $ENV{TCPREMOTEHOST}"); - - # dup to STDIN/STDOUT - POSIX::dup2(fileno($client), 0); - POSIX::dup2(fileno($client), 1); - - my $qpsmtpd = Qpsmtpd::TcpServer->new(); - $qpsmtpd->start_connection - ( - local_ip => $ENV{TCPLOCALIP}, - local_port => $lport, - remote_ip => $ENV{TCPREMOTEIP}, - remote_port => $port, - ); - $qpsmtpd->run(); - - exit; # child leaves -} - -sub log { - my ($level,$message) = @_; - # $level not used yet. this is reimplemented from elsewhere anyway - warn("$$ $message\n"); -} - -__END__ - -1; diff --git a/qpsmtpd-server b/qpsmtpd-server deleted file mode 100755 index 248c472..0000000 --- a/qpsmtpd-server +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/perl -Tw -# Copyright (c) 2001 Ask Bjoern Hansen. See the LICENSE file for details. -# The "command dispatch" system is taken from colobus - http://trainedmonkey.com/colobus/ -# -# this is designed to be run under tcpserver (http://cr.yp.to/ucspi-tcp.html) -# or inetd if you're into that sort of thing -# -# -# For more information see http://develooper.com/code/qpsmtpd/ -# -# - -use lib 'lib'; -use Qpsmtpd::SelectServer; -use strict; -$| = 1; - -delete $ENV{ENV}; -$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin'; - -Qpsmtpd::SelectServer->main(); - -__END__ - - - - -1;