Much improved config server, especially the stats

git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@410 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
Matt Sergeant 2005-04-28 21:38:43 +00:00
parent a75f4a53e0
commit 46cda05112
5 changed files with 264 additions and 32 deletions

View File

@ -3,9 +3,11 @@
package Qpsmtpd::ConfigServer; package Qpsmtpd::ConfigServer;
use base ('Danga::Client'); use base ('Danga::Client');
use Qpsmtpd::Constants;
use strict;
use fields qw( use fields qw(
commands
_auth _auth
_commands _commands
_config_cache _config_cache
@ -15,16 +17,19 @@ use fields qw(
_extras _extras
); );
my $PROMPT = "Enter command: ";
sub new { sub new {
my Qpsmtpd::ConfigServer $self = shift; my Qpsmtpd::ConfigServer $self = shift;
$self = fields::new($self) unless ref $self; $self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ ); $self->SUPER::new( @_ );
$self->{commands} = { help => 1, status => 1, }; $self->write($PROMPT);
$self->write("Enter command:\n");
return $self; return $self;
} }
sub max_idle_time { 3600 } # one hour
sub process_line { sub process_line {
my $self = shift; my $self = shift;
my $line = shift || return; my $line = shift || return;
@ -55,7 +60,8 @@ sub fault {
my $self = shift; my $self = shift;
my ($msg) = shift || "program fault - command not performed"; my ($msg) = shift || "program fault - command not performed";
print STDERR "$0 [$$]: $msg ($!)\n"; print STDERR "$0 [$$]: $msg ($!)\n";
return $self->respond("Error - " . $msg, "Enter command:"); $self->respond("Error - " . $msg);
return $PROMPT;
} }
sub _process_line { sub _process_line {
@ -64,18 +70,18 @@ sub _process_line {
$line =~ s/\r?\n//; $line =~ s/\r?\n//;
my ($cmd, @params) = split(/ +/, $line); my ($cmd, @params) = split(/ +/, $line);
my $meth = lc($cmd); my $meth = "cmd_" . lc($cmd);
if (my $lookup = $self->{commands}->{$meth} && $self->can($meth)) { if (my $lookup = $self->can($meth)) {
my $resp = eval { my $resp = eval {
$lookup->($self, @params); $lookup->($self, @params);
}; };
if ($@) { if ($@) {
my $error = $@; my $error = $@;
chomp($error); chomp($error);
$self->log(LOGERROR, "Command Error: $error"); Qpsmtpd->log(LOGERROR, "Command Error: $error");
return $self->fault("command '$cmd' failed unexpectedly"); return $self->fault("command '$cmd' failed unexpectedly");
} }
return $resp . "\nEnter command:\n"; return "$resp\n$PROMPT";
} }
else { else {
# No such method - i.e. unrecognized command # No such method - i.e. unrecognized command
@ -84,24 +90,74 @@ sub _process_line {
} }
my %helptext = ( my %helptext = (
all => "Available Commands:\n\nSTATUS\nHELP [CMD]", help => "HELP [CMD] - Get help on all commands or a specific command",
status => "STATUS - Returns status information about current connections", status => "STATUS - Returns status information about current connections",
list => "LIST [LIMIT] - List the connections, specify limit or negative limit to shrink list",
kill => "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF",
pause => "PAUSE - Stop accepting new connections",
continue => "CONTINUE - Resume accepting connections",
reload => "RELOAD - Reload all plugins and config",
quit => "QUIT - Exit the config server",
); );
sub help { sub cmd_help {
my $self = shift; my $self = shift;
my ($subcmd) = @_; my ($subcmd) = @_;
$subcmd ||= 'all'; $subcmd ||= 'help';
$subcmd = lc($subcmd); $subcmd = lc($subcmd);
my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help all'"; if ($subcmd eq 'help') {
warn "help returning: $txt\n"; my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext));
return $txt . "\n"; return "Available Commands:\n\n$txt\n";
}
my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list.";
return "$txt\n";
} }
sub status { sub cmd_quit {
my $self = shift; my $self = shift;
$self->close;
}
sub cmd_pause {
my $self = shift;
my $other_fds = $self->OtherFds;
$self->{other_fds} = { %$other_fds };
%$other_fds = ();
return "PAUSED";
}
sub cmd_status {
my $self = shift;
# Status should show:
# - Total time running
# - Total number of mails received
# - Total number of mails rejected (5xx)
# - Total number of mails tempfailed (5xx)
# - Avg number of mails/minute
# - Number of current connections
# - Number of outstanding DNS queries
my $output = "Current Status as of " . gmtime() . " GMT\n\n";
if ($INC{'Qpsmtpd/Stats.pm'}) {
# Stats plugin is loaded
my $uptime = Qpsmtpd::Stats->uptime;
my $recvd = Qpsmtpd::Stats->mails_received;
my $reject = Qpsmtpd::Stats->mails_rejected;
my $soft = Qpsmtpd::Stats->mails_tempfailed;
my $rate = Qpsmtpd::Stats->mails_per_sec;
$output .= sprintf(" Uptime: %0.2f sec\n".
" Mails Received: % 10d\n".
" 5xx: % 10d\n".
" 4xx: % 10d\n".
"Mails per second: %0.2f\n",
$uptime, $recvd, $reject, $soft, $rate);
}
my $descriptors = Danga::Socket->DescriptorMap; my $descriptors = Danga::Socket->DescriptorMap;
@ -117,9 +173,99 @@ sub status {
} }
} }
return $output .= "Curr Connections: $current_connections\n".
" Current Connections: $current_connections "Curr DNS Queries: $current_dns";
Current DNS Queries: $current_dns";
return $output;
}
sub cmd_list {
my $self = shift;
my ($count) = @_;
my $descriptors = Danga::Socket->DescriptorMap;
my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n";
my @all;
foreach my $fd (keys %$descriptors) {
my $pob = $descriptors->{$fd};
if ($pob->isa("Qpsmtpd::PollServer")) {
next unless $pob->connection->remote_ip; # haven't even started yet
push @all, [$pob+0, $pob->connection->remote_ip,
$pob->connection->remote_host, $pob->uptime];
}
}
@all = sort { $a->[3] <=> $b->[3] } @all;
if ($count) {
if ($count > 0) {
@all = @all[$#all-($count-1) .. $#all];
}
else {
@all = @all[0..(abs($count) - 1)];
}
}
foreach my $item (@all) {
$list .= sprintf("%x : %s [%s] Connected %0.2fs\n", @$item);
}
return $list;
}
sub cmd_kill {
my $self = shift;
my ($match) = @_;
return "SYNTAX: KILL (\$IP | \$REF)\n" unless $match;
my $descriptors = Danga::Socket->DescriptorMap;
my $killed = 0;
my $is_ip = (index($match, '.') >= 0);
foreach my $fd (keys %$descriptors) {
my $pob = $descriptors->{$fd};
if ($pob->isa("Qpsmtpd::PollServer")) {
if ($is_ip) {
next unless $pob->connection->remote_ip; # haven't even started yet
if ($pob->connection->remote_ip eq $match) {
$pob->write("550 Your connection has been killed by an administrator\r\n");
$pob->disconnect;
$killed++;
}
}
else {
# match by ID
if ($pob+0 == hex($match)) {
$pob->write("550 Your connection has been killed by an administrator\r\n");
$pob->disconnect;
$killed++;
}
}
}
}
return "Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n";
}
sub cmd_dump {
my $self = shift;
my ($ref) = @_;
return "SYNTAX: DUMP \$REF\n" unless $ref;
require Data::Dumper;
$Data::Dumper::Indent=1;
my $descriptors = Danga::Socket->DescriptorMap;
foreach my $fd (keys %$descriptors) {
my $pob = $descriptors->{$fd};
if ($pob->isa("Qpsmtpd::PollServer")) {
if ($pob+0 == hex($ref)) {
return Data::Dumper::Dumper($pob);
}
}
}
return "Unable to find the connection: $ref. Try the LIST command\n";
} }
1; 1;

View File

@ -13,6 +13,7 @@ use fields qw(
data_size data_size
max_size max_size
hooks hooks
start_time
_auth _auth
_commands _commands
_config_cache _config_cache
@ -28,6 +29,7 @@ use Danga::DNS;
use Mail::Header; use Mail::Header;
use POSIX qw(strftime); use POSIX qw(strftime);
use Socket qw(inet_aton AF_INET CRLF); use Socket qw(inet_aton AF_INET CRLF);
use Time::HiRes qw(time);
use strict; use strict;
sub input_sock { sub input_sock {
@ -41,10 +43,17 @@ sub new {
$self = fields::new($self) unless ref $self; $self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ ); $self->SUPER::new( @_ );
$self->{start_time} = time;
$self->load_plugins; $self->load_plugins;
return $self; return $self;
} }
sub uptime {
my Qpsmtpd::PollServer $self = shift;
return (time() - $self->{start_time});
}
sub reset_for_next_message { sub reset_for_next_message {
my $self = shift; my $self = shift;
$self->SUPER::reset_for_next_message(@_); $self->SUPER::reset_for_next_message(@_);

35
lib/Qpsmtpd/Stats.pm Normal file
View File

@ -0,0 +1,35 @@
# $Id$
package Qpsmtpd::Stats;
use strict;
use Qpsmtpd;
use Qpsmtpd::Constants;
use Time::HiRes qw(time);
my $START_TIME = time;
our $MAILS_RECEIVED = 0;
our $MAILS_REJECTED = 0;
our $MAILS_TEMPFAIL = 0;
sub uptime {
return (time() - $START_TIME);
}
sub mails_received {
return $MAILS_RECEIVED;
}
sub mails_rejected {
return $MAILS_REJECTED;
}
sub mails_tempfailed {
return $MAILS_TEMPFAIL;
}
sub mails_per_sec {
return ($MAILS_RECEIVED / uptime());
}
1;

31
plugins/stats Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/perl -w
use Qpsmtpd::Stats;
sub register {
my ($self) = @_;
$self->register_hook('deny', 'increment_deny');
$self->register_hook('queue', 'increment_mails');
}
sub increment_deny {
my ($self, $level) = @_;
if ($level == DENY or $level == DENY_DISCONNECT) {
$Qpsmtpd::Stats::MAILS_REJECTED++;
}
elsif ($level == DENYSOFT or $level == DENYSOFT_DISCONNECT) {
$Qpsmtpd::Stats::MAILS_TEMPFAIL++;
}
return DECLINED;
}
sub increment_mails {
my $self = shift;
$Qpsmtpd::Stats::MAILS_RECEIVED++;
return DECLINED;
}

39
qpsmtpd
View File

@ -43,6 +43,7 @@ my $PROCS = 1;
my $MAXCONN = 15; # max simultaneous connections my $MAXCONN = 15; # max simultaneous connections
my $USER = 'smtpd'; # user to suid to my $USER = 'smtpd'; # user to suid to
my $MAXCONNIP = 5; # max simultaneous connections from one IP my $MAXCONNIP = 5; # max simultaneous connections from one IP
my $PAUSED = 0;
sub help { sub help {
print <<EOT; print <<EOT;
@ -97,8 +98,8 @@ Danga::Socket::init_poller();
my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" :
$Danga::Socket::HaveKQueue ? "kqueue()" : "poll()"); $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
my $server; my $SERVER;
my $config_server; my $CONFIG_SERVER;
# Code for inetd/tcpserver mode # Code for inetd/tcpserver mode
if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) { if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) {
@ -143,7 +144,7 @@ sub spawn_child {
$SIG{CHLD} = "DEFAULT"; $SIG{CHLD} = "DEFAULT";
Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler); Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler);
Qpsmtpd::PollServer->EventLoop(); Qpsmtpd::PollServer->EventLoop();
exit; exit;
} }
@ -199,7 +200,7 @@ sub run_as_inetd {
sub run_as_server { sub run_as_server {
# establish SERVER socket, bind and listen. # establish SERVER socket, bind and listen.
$server = IO::Socket::INET->new(LocalPort => $PORT, $SERVER = IO::Socket::INET->new(LocalPort => $PORT,
LocalAddr => $LOCALADDR, LocalAddr => $LOCALADDR,
Type => SOCK_STREAM, Type => SOCK_STREAM,
Proto => IPPROTO_TCP, Proto => IPPROTO_TCP,
@ -208,10 +209,10 @@ sub run_as_server {
Listen => 10 ) Listen => 10 )
or die "Error creating server $LOCALADDR:$PORT : $@\n"; or die "Error creating server $LOCALADDR:$PORT : $@\n";
IO::Handle::blocking($server, 0); IO::Handle::blocking($SERVER, 0);
binmode($server, ':raw'); binmode($SERVER, ':raw');
$config_server = IO::Socket::INET->new(LocalPort => $CONFIG_PORT, $CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT,
LocalAddr => $CONFIG_LOCALADDR, LocalAddr => $CONFIG_LOCALADDR,
Type => SOCK_STREAM, Type => SOCK_STREAM,
Proto => IPPROTO_TCP, Proto => IPPROTO_TCP,
@ -220,8 +221,8 @@ sub run_as_server {
Listen => 1 ) Listen => 1 )
or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n"; or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n";
IO::Handle::blocking($config_server, 0); IO::Handle::blocking($CONFIG_SERVER, 0);
binmode($config_server, ':raw'); binmode($CONFIG_SERVER, ':raw');
# Drop priviledges # Drop priviledges
my (undef, undef, $quid, $qgid) = getpwnam $USER or my (undef, undef, $quid, $qgid) = getpwnam $USER or
@ -258,8 +259,8 @@ sub run_as_server {
} }
::log(LOGDEBUG, "Listening on $PORT with single process $POLL" . ::log(LOGDEBUG, "Listening on $PORT with single process $POLL" .
($LineMode ? " (forking server)" : "")); ($LineMode ? " (forking server)" : ""));
Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler, Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
fileno($config_server) => \&config_handler, fileno($CONFIG_SERVER) => \&config_handler,
); );
while (1) { while (1) {
Qpsmtpd::PollServer->EventLoop(); Qpsmtpd::PollServer->EventLoop();
@ -270,7 +271,7 @@ sub run_as_server {
} }
sub config_handler { sub config_handler {
my $csock = $config_server->accept(); my $csock = $CONFIG_SERVER->accept();
if (!$csock) { if (!$csock) {
warn("accept failed on config server: $!"); warn("accept failed on config server: $!");
return; return;
@ -295,7 +296,7 @@ sub accept_handler {
return; return;
} }
my $csock = $server->accept(); my $csock = $SERVER->accept();
if (!$csock) { if (!$csock) {
# warn("accept() failed: $!"); # warn("accept() failed: $!");
return; return;
@ -313,6 +314,12 @@ sub accept_handler {
my $client = Qpsmtpd::PollServer->new($csock); my $client = Qpsmtpd::PollServer->new($csock);
my $rem_ip = $client->peer_ip_string; my $rem_ip = $client->peer_ip_string;
if ($PAUSED) {
$client->write("451 Sorry, this server is currently paused\r\n");
$client->close;
return;
}
if ($MAXCONNIP) { if ($MAXCONNIP) {
my $num_conn = 1; # seed with current value my $num_conn = 1; # seed with current value
@ -370,7 +377,7 @@ sub accept_handler {
return $csock->close(); return $csock->close();
} }
$server->close(); # make sure the child doesn't accept() new connections $SERVER->close(); # make sure the child doesn't accept() new connections
$SIG{$_} = 'DEFAULT' for keys %SIG; $SIG{$_} = 'DEFAULT' for keys %SIG;
@ -406,3 +413,7 @@ sub log {
warn("$$ $message\n"); warn("$$ $message\n");
} }
sub pause {
my ($pause) = @_;
$PAUSED = $pause;
}