Simplify qpsmtpd script (remove inetd and forking server)

Greatly simplify Danga::Client due to no more need for line mode client
Update to latest Danga::Socket
Fix check_earlytalker to use new API
Fix Danga::DNS to use new API


git-svn-id: https://svn.perl.org/qpsmtpd/trunk@643 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
Matt Sergeant 2006-06-20 13:51:32 +00:00
parent a8c4a3c5e1
commit f31d18c6cd
6 changed files with 725 additions and 592 deletions

View File

@ -2,7 +2,7 @@
package Danga::Client; package Danga::Client;
use base 'Danga::TimeoutSocket'; use base 'Danga::TimeoutSocket';
use fields qw(line closing disable_read can_read_mode); use fields qw(line pause_count);
use Time::HiRes (); use Time::HiRes ();
# 30 seconds max timeout! # 30 seconds max timeout!
@ -21,68 +21,14 @@ sub new {
sub reset_for_next_message { sub reset_for_next_message {
my Danga::Client $self = shift; my Danga::Client $self = shift;
$self->{line} = ''; $self->{line} = '';
$self->{disable_read} = 0; $self->{pause_count} = 0;
$self->{can_read_mode} = 0;
return $self; return $self;
} }
sub get_line {
my Danga::Client $self = shift;
if (!$self->have_line) {
$self->SetPostLoopCallback(sub { $self->have_line ? 0 : 1 });
#warn("get_line PRE\n");
$self->EventLoop();
#warn("get_line POST\n");
$self->disable_read();
}
return if $self->{closing};
# now have a line.
$self->{alive_time} = time;
$self->{line} =~ s/^(.*?\n)//;
return $1;
}
sub can_read {
my Danga::Client $self = shift;
my ($timeout) = @_;
my $end = Time::HiRes::time() + $timeout;
# warn("Calling can-read\n");
$self->{can_read_mode} = 1;
if (!length($self->{line})) {
$self->disable_read();
# loop because any callback, not just ours, can make EventLoop return
while( !(length($self->{line}) || (Time::HiRes::time > $end)) ) {
$self->SetPostLoopCallback(sub { (length($self->{line}) ||
(Time::HiRes::time > $end)) ? 0 : 1 });
#warn("get_line PRE\n");
$self->EventLoop();
#warn("get_line POST\n");
}
$self->enable_read();
}
$self->{can_read_mode} = 0;
$self->SetPostLoopCallback(undef);
return if $self->{closing};
$self->{alive_time} = time;
# warn("can_read returning for '$self->{line}'\n");
return 1 if length($self->{line});
return;
}
sub have_line {
my Danga::Client $self = shift;
return 1 if $self->{closing};
if ($self->{line} =~ /\n/) {
return 1;
}
return 0;
}
sub event_read { sub event_read {
my Danga::Client $self = shift; my Danga::Client $self = shift;
my $bref = $self->read(8192); my $bref = $self->read(8192);
return $self->close($!) unless defined $bref; return $self->close($!) unless defined $bref;
# $self->watch_read(0);
$self->process_read_buf($bref); $self->process_read_buf($bref);
} }
@ -90,8 +36,7 @@ sub process_read_buf {
my Danga::Client $self = shift; my Danga::Client $self = shift;
my $bref = shift; my $bref = shift;
$self->{line} .= $$bref; $self->{line} .= $$bref;
return if ! $self->readable(); return if $self->paused();
return if $::LineMode;
while ($self->{line} =~ s/^(.*?\n)//) { while ($self->{line} =~ s/^(.*?\n)//) {
my $line = $1; my $line = $1;
@ -99,34 +44,40 @@ sub process_read_buf {
my $resp = $self->process_line($line); my $resp = $self->process_line($line);
if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) } if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) }
$self->write($resp) if $resp; $self->write($resp) if $resp;
$self->watch_read(0) if $self->{disable_read}; # $self->watch_read(0) if $self->{pause_count};
last if ! $self->readable(); last if $self->paused();
} }
if($self->have_line) { }
$self->shift_back_read($self->{line});
sub has_data {
my Danga::Client $self = shift;
return length($self->{line}) ? 1 : 0;
}
sub clear_data {
my Danga::Client $self = shift;
$self->{line} = ''; $self->{line} = '';
} }
sub paused {
my Danga::Client $self = shift;
return 1 if $self->{pause_count};
return 1 if $self->{closed};
return 0;
} }
sub readable { sub pause_read {
my Danga::Client $self = shift; my Danga::Client $self = shift;
return 0 if $self->{disable_read} > 0; $self->{pause_count}++;
return 0 if $self->{closed} > 0; # $self->watch_read(0);
return 1;
} }
sub disable_read { sub continue_read {
my Danga::Client $self = shift; my Danga::Client $self = shift;
$self->{disable_read}++; $self->{pause_count}--;
$self->watch_read(0); if ($self->{pause_count} <= 0) {
} $self->{pause_count} = 0;
# $self->watch_read(1);
sub enable_read {
my Danga::Client $self = shift;
$self->{disable_read}--;
if ($self->{disable_read} <= 0) {
$self->{disable_read} = 0;
$self->watch_read(1);
} }
} }
@ -137,7 +88,6 @@ sub process_line {
sub close { sub close {
my Danga::Client $self = shift; my Danga::Client $self = shift;
$self->{closing} = 1;
print "closing @_\n" if $::DEBUG; print "closing @_\n" if $::DEBUG;
$self->SUPER::close(@_); $self->SUPER::close(@_);
} }

View File

@ -25,7 +25,7 @@ sub new {
$resolver ||= Danga::DNS::Resolver->new(); $resolver ||= Danga::DNS::Resolver->new();
my $client = $options{client}; my $client = $options{client};
$client->disable_read if $client; $client->pause_read() if $client;
$self = fields::new($self) unless ref $self; $self = fields::new($self) unless ref $self;
@ -40,13 +40,13 @@ sub new {
if ($options{type}) { if ($options{type}) {
if ( ($options{type} eq 'A') || ($options{type} eq 'PTR') ) { if ( ($options{type} eq 'A') || ($options{type} eq 'PTR') ) {
if (!$resolver->query($self, @{$self->{hosts}})) { if (!$resolver->query($self, @{$self->{hosts}})) {
$client->enable_read() if $client; $client->continue_read() if $client;
return; return;
} }
} }
else { else {
if (!$resolver->query_type($self, $options{type}, @{$self->{hosts}})) { if (!$resolver->query_type($self, $options{type}, @{$self->{hosts}})) {
$client->enable_read() if $client; $client->continue_read() if $client;
return; return;
} }
# die "Unsupported DNS query type: $options{type}"; # die "Unsupported DNS query type: $options{type}";
@ -54,7 +54,7 @@ sub new {
} }
else { else {
if (!$resolver->query($self, @{$self->{hosts}})) { if (!$resolver->query($self, @{$self->{hosts}})) {
$client->enable_read() if $client; $client->continue_read() if $client;
return; return;
} }
} }
@ -84,7 +84,7 @@ sub DESTROY {
$self->{callback}->("NXDOMAIN", $host); $self->{callback}->("NXDOMAIN", $host);
} }
} }
$self->{client}->enable_read if $self->{client}; $self->{client}->continue_read() if $self->{client};
if ($self->{finished}) { if ($self->{finished}) {
$self->{finished}->(); $self->{finished}->();
} }

File diff suppressed because it is too large Load Diff

View File

@ -337,7 +337,7 @@ sub run_hooks {
@r = $self->run_hook($hook, $code, @_); @r = $self->run_hook($hook, $code, @_);
next unless @r; next unless @r;
if ($r[0] == CONTINUATION) { if ($r[0] == CONTINUATION) {
$self->disable_read() if $self->isa('Danga::Client'); $self->pause_read() if $self->isa('Danga::Client');
$self->{_continuation} = [$hook, [@_], @local_hooks]; $self->{_continuation} = [$hook, [@_], @local_hooks];
} }
last unless $r[0] == DECLINED; last unless $r[0] == DECLINED;
@ -351,7 +351,7 @@ sub run_hooks {
sub finish_continuation { sub finish_continuation {
my ($self) = @_; my ($self) = @_;
die "No continuation in progress" unless $self->{_continuation}; die "No continuation in progress" unless $self->{_continuation};
$self->enable_read() if $self->isa('Danga::Client'); $self->continue_read() if $self->isa('Danga::Client');
my $todo = $self->{_continuation}; my $todo = $self->{_continuation};
$self->{_continuation} = undef; $self->{_continuation} = undef;
my $hook = shift @$todo || die "No hook in the continuation"; my $hook = shift @$todo || die "No hook in the continuation";
@ -361,7 +361,7 @@ sub finish_continuation {
my $code = shift @$todo; my $code = shift @$todo;
@r = $self->run_hook($hook, $code, @$args); @r = $self->run_hook($hook, $code, @$args);
if ($r[0] == CONTINUATION) { if ($r[0] == CONTINUATION) {
$self->disable_read() if $self->isa('Danga::Client'); $self->pause_read() if $self->isa('Danga::Client');
$self->{_continuation} = [$hook, $args, @$todo]; $self->{_continuation} = [$hook, $args, @$todo];
return @r; return @r;
} }

View File

@ -44,6 +44,15 @@ issued a deny or denysoft (depending on the value of I<action>). The default
is to react at the SMTP greeting stage by issuing the apropriate response code is to react at the SMTP greeting stage by issuing the apropriate response code
and terminating the SMTP connection. and terminating the SMTP connection.
=item check-at [string: connect, data]
Defines when to check for early talkers, either at connect time (pre-greet pause)
or at DATA time (pause before sending "354 go ahead").
The default is I<connect>.
Note that defer-reject has no meaning if check-at is I<data>.
=back =back
=cut =cut
@ -61,23 +70,27 @@ sub register {
'wait' => 1, 'wait' => 1,
'action' => 'denysoft', 'action' => 'denysoft',
'defer-reject' => 0, 'defer-reject' => 0,
'check-at' => 'connect',
@args, @args,
}; };
print STDERR "Check at: ", $self->{_args}{'check-at'}, "\n";
if ($qp->isa('Qpsmtpd::Apache')) { if ($qp->isa('Qpsmtpd::Apache')) {
require APR::Const; require APR::Const;
APR::Const->import(qw(POLLIN SUCCESS)); APR::Const->import(qw(POLLIN SUCCESS));
$self->register_hook('connect', 'hook_connect_apr'); $self->register_hook($self->{_args}->{'check-at'}, 'check_talker_apr');
} }
else { else {
$self->register_hook('connect', 'hook_connect'); $self->register_hook($self->{_args}->{'check-at'}, 'check_talker_poll');
} }
$self->register_hook('connect', 'hook_connect_post'); $self->register_hook($self->{_args}->{'check-at'}, 'check_talker_post');
if ($self->{_args}{'check-at'} eq 'connect') {
$self->register_hook('mail', 'hook_mail') $self->register_hook('mail', 'hook_mail')
if $self->{_args}->{'defer-reject'}; if $self->{_args}->{'defer-reject'};
}
1; 1;
} }
sub hook_connect_apr { sub check_talker_apr {
my ($self, $transaction) = @_; my ($self, $transaction) = @_;
return DECLINED if ($self->qp->connection->notes('whitelistclient')); return DECLINED if ($self->qp->connection->notes('whitelistclient'));
@ -104,29 +117,27 @@ sub hook_connect_apr {
return DECLINED; return DECLINED;
} }
sub hook_connect { sub check_talker_poll {
my ($self, $transaction) = @_; my ($self, $transaction) = @_;
my $qp = $self->qp; my $qp = $self->qp;
my $conn = $qp->connection; my $conn = $qp->connection;
$qp->AddTimer($self->{_args}{'wait'}, sub { read_now($qp, $conn) }); $qp->AddTimer($self->{_args}{'wait'}, sub { read_now($qp, $conn, $self->{_args}{'check-at'}) });
return CONTINUATION; return CONTINUATION;
} }
sub read_now { sub read_now {
my ($qp, $conn) = @_; my ($qp, $conn, $phase) = @_;
if (my $data = $qp->read(1024)) { if ($qp->has_data) {
if (length($$data)) {
$qp->log(LOGNOTICE, 'remote host started talking before we said hello'); $qp->log(LOGNOTICE, 'remote host started talking before we said hello');
$qp->push_back_read($data); $qp->clear_data if $phase eq 'data';
$conn->notes('earlytalker', 1); $conn->notes('earlytalker', 1);
} }
}
$qp->finish_continuation; $qp->finish_continuation;
} }
sub hook_connect_post { sub check_talker_post {
my ($self, $transaction) = @_; my ($self, $transaction) = @_;
my $conn = $self->qp->connection; my $conn = $self->qp->connection;

104
qpsmtpd
View File

@ -35,7 +35,6 @@ my $CONFIG_LOCALADDR = '127.0.0.1';
my $PORT = 2525; my $PORT = 2525;
my $LOCALADDR = '0.0.0.0'; my $LOCALADDR = '0.0.0.0';
my $LineMode = 0;
my $PROCS = 1; my $PROCS = 1;
my $MAXCONN = 15; # max simultaneous connections my $MAXCONN = 15; # max simultaneous connections
my $USER = 'smtpd'; # user to suid to my $USER = 'smtpd'; # user to suid to
@ -54,7 +53,6 @@ Options:
-c, --limit-connections N : limit concurrent connections to N; default 15 -c, --limit-connections N : limit concurrent connections to N; default 15
-u, --user U : run as a particular user; defualt 'smtpd' -u, --user U : run as a particular user; defualt 'smtpd'
-m, --max-from-ip M : limit connections from a single IP; default 5 -m, --max-from-ip M : limit connections from a single IP; default 5
-f, --forkmode : fork a child for each connection
-j, --procs J : spawn J processes; default 1 -j, --procs J : spawn J processes; default 1
-a, --accept K : accept up to K conns per loop; default 20 -a, --accept K : accept up to K conns per loop; default 20
-h, --help : this page -h, --help : this page
@ -73,7 +71,6 @@ GetOptions(
'l|listen-address=s' => \$LOCALADDR, 'l|listen-address=s' => \$LOCALADDR,
'j|procs=i' => \$PROCS, 'j|procs=i' => \$PROCS,
'd|debug+' => \$DEBUG, 'd|debug+' => \$DEBUG,
'f|forkmode' => \$LineMode,
'c|limit-connections=i' => \$MAXCONN, 'c|limit-connections=i' => \$MAXCONN,
'm|max-from-ip=i' => \$MAXCONNIP, 'm|max-from-ip=i' => \$MAXCONNIP,
'u|user=s' => \$USER, 'u|user=s' => \$USER,
@ -90,8 +87,6 @@ if ($MAXCONN =~ /^(\d+)$/) { $MAXCONN = $1 } else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help } if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }
if ($NUMACCEPT =~ /^(\d+)$/) { $NUMACCEPT = $1 } else { &help } if ($NUMACCEPT =~ /^(\d+)$/) { $NUMACCEPT = $1 } else { &help }
my $_NUMACCEPT = $NUMACCEPT; my $_NUMACCEPT = $NUMACCEPT;
$::LineMode = $LineMode;
$PROCS = 1 if $LineMode;
# This is a bit of a hack, but we get to approximate MAXCONN stuff when we # This is a bit of a hack, but we get to approximate MAXCONN stuff when we
# have multiple children listening on the same socket. # have multiple children listening on the same socket.
$MAXCONN /= $PROCS; $MAXCONN /= $PROCS;
@ -102,7 +97,7 @@ sub force_poll {
$Danga::Socket::HaveKQueue = 0; $Danga::Socket::HaveKQueue = 0;
} }
Danga::Socket::init_poller(); # Danga::Socket::init_poller();
my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" :
$Danga::Socket::HaveKQueue ? "kqueue()" : "poll()"); $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
@ -110,12 +105,6 @@ my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" :
my $SERVER; my $SERVER;
my $CONFIG_SERVER; my $CONFIG_SERVER;
# Code for inetd/tcpserver mode
if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) {
run_as_inetd();
exit(0);
}
my %childstatus = (); my %childstatus = ();
run_as_server(); run_as_server();
@ -165,8 +154,7 @@ sub sig_chld {
print "child $child died\n"; print "child $child died\n";
delete $childstatus{$child}; delete $childstatus{$child};
} }
return if $LineMode; # restart a new child (assuming this one died)
# restart a new child if in poll server mode
spawn_child(); spawn_child();
$SIG{CHLD} = \&sig_chld; $SIG{CHLD} = \&sig_chld;
} }
@ -177,33 +165,6 @@ sub HUNTSMAN {
exit(0); exit(0);
} }
sub run_as_inetd {
$LineMode = $::LineMode = 1;
my $insock = IO::Handle->new_from_fd(0, "r");
IO::Handle::blocking($insock, 0);
my $outsock = IO::Handle->new_from_fd(1, "w");
IO::Handle::blocking($outsock, 0);
my $client = Danga::Client->new($insock);
my $out = Qpsmtpd::PollServer->new($outsock);
$out->load_plugins;
$out->input_sock($client);
$client->push_back_read("Connect\n");
# Cause poll/kevent/epoll to end quickly in first iteration
Qpsmtpd::PollServer->AddTimer(1, sub { });
while (1) {
$client->enable_read;
my $line = $client->get_line;
last if !defined($line);
my $output = $out->process_line($line);
$out->write($output) if $output;
}
}
sub run_as_server { sub run_as_server {
local $::MAXconn = $MAXCONN; local $::MAXconn = $MAXCONN;
# establish SERVER socket, bind and listen. # establish SERVER socket, bind and listen.
@ -261,11 +222,7 @@ sub run_as_server {
sleep while (1); sleep while (1);
} }
else { else {
if ($LineMode) { $plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process $POLL");
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
}
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process $POLL" .
($LineMode ? " (forking server)" : ""));
Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler, Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
fileno($CONFIG_SERVER) => \&config_handler, fileno($CONFIG_SERVER) => \&config_handler,
); );
@ -298,13 +255,8 @@ sub config_handler {
# Accept all new connections # Accept all new connections
sub accept_handler { sub accept_handler {
my $running; my $running;
if( $LineMode ) {
$running = scalar keys %childstatus;
}
else {
my $descriptors = Danga::Client->DescriptorMap; my $descriptors = Danga::Client->DescriptorMap;
$running = scalar keys %$descriptors; $running = scalar keys %$descriptors;
}
for (1 .. $NUMACCEPT) { for (1 .. $NUMACCEPT) {
if ($running >= $MAXCONN) { if ($running >= $MAXCONN) {
@ -349,7 +301,6 @@ sub _accept_handler {
IO::Handle::blocking($csock, 0); IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
if (!$LineMode) {
# multiplex mode # multiplex mode
my $client = Qpsmtpd::PollServer->new($csock); my $client = Qpsmtpd::PollServer->new($csock);
my $rem_ip = $client->peer_ip_string; my $rem_ip = $client->peer_ip_string;
@ -389,55 +340,6 @@ sub _accept_handler {
return 1; return 1;
} }
# fork-per-connection mode
my $rem_ip = $csock->sockhost();
if ($MAXCONNIP) {
my $num_conn = 1; # seed with current value
my @rip = values %childstatus;
foreach my $rip (@rip) {
++$num_conn if (defined $rip && $rip eq $rem_ip);
}
if ($num_conn > $MAXCONNIP) {
::log(LOGINFO,"Too many connections from $rem_ip: "
."$num_conn > $MAXCONNIP. Denying connection.");
print $csock "451 Sorry, too many connections from $rem_ip, try again later\r\n";
close $csock;
return 1;
}
}
if (my $pid = _fork) {
$childstatus{$pid} = $rem_ip;
return $csock->close();
}
$SERVER->close(); # make sure the child doesn't accept() new connections
$SIG{$_} = 'DEFAULT' for keys %SIG;
my $client = Qpsmtpd::PollServer->new($csock);
$client->push_back_read("Connect\n");
# Cause poll/kevent/epoll to end quickly in first iteration
Qpsmtpd::PollServer->AddTimer(0.1, sub { });
while (1) {
$client->enable_read;
my $line = $client->get_line;
last if !defined($line);
my $resp = $client->process_line($line);
$client->write($resp) if $resp;
}
$client->log(LOGDEBUG, "Finished with child %d.\n", fileno($csock))
if $DEBUG;
$client->close();
exit;
}
######################################################################## ########################################################################
sub log { sub log {