diff --git a/lib/Qpsmtpd/ConfigServer.pm b/lib/Qpsmtpd/ConfigServer.pm index edee148..ff5e2b8 100644 --- a/lib/Qpsmtpd/ConfigServer.pm +++ b/lib/Qpsmtpd/ConfigServer.pm @@ -3,9 +3,11 @@ package Qpsmtpd::ConfigServer; use base ('Danga::Client'); +use Qpsmtpd::Constants; + +use strict; use fields qw( - commands _auth _commands _config_cache @@ -15,16 +17,19 @@ use fields qw( _extras ); +my $PROMPT = "Enter command: "; + sub new { my Qpsmtpd::ConfigServer $self = shift; $self = fields::new($self) unless ref $self; $self->SUPER::new( @_ ); - $self->{commands} = { help => 1, status => 1, }; - $self->write("Enter command:\n"); + $self->write($PROMPT); return $self; } +sub max_idle_time { 3600 } # one hour + sub process_line { my $self = shift; my $line = shift || return; @@ -55,7 +60,8 @@ sub fault { my $self = shift; my ($msg) = shift || "program fault - command not performed"; print STDERR "$0 [$$]: $msg ($!)\n"; - return $self->respond("Error - " . $msg, "Enter command:"); + $self->respond("Error - " . $msg); + return $PROMPT; } sub _process_line { @@ -64,18 +70,18 @@ sub _process_line { $line =~ s/\r?\n//; my ($cmd, @params) = split(/ +/, $line); - my $meth = lc($cmd); - if (my $lookup = $self->{commands}->{$meth} && $self->can($meth)) { + my $meth = "cmd_" . lc($cmd); + if (my $lookup = $self->can($meth)) { my $resp = eval { $lookup->($self, @params); }; if ($@) { my $error = $@; chomp($error); - $self->log(LOGERROR, "Command Error: $error"); + Qpsmtpd->log(LOGERROR, "Command Error: $error"); return $self->fault("command '$cmd' failed unexpectedly"); } - return $resp . "\nEnter command:\n"; + return "$resp\n$PROMPT"; } else { # No such method - i.e. unrecognized command @@ -84,24 +90,74 @@ sub _process_line { } my %helptext = ( - all => "Available Commands:\n\nSTATUS\nHELP [CMD]", + help => "HELP [CMD] - Get help on all commands or a specific command", status => "STATUS - Returns status information about current connections", + list => "LIST [LIMIT] - List the connections, specify limit or negative limit to shrink list", + kill => "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF", + pause => "PAUSE - Stop accepting new connections", + continue => "CONTINUE - Resume accepting connections", + reload => "RELOAD - Reload all plugins and config", + quit => "QUIT - Exit the config server", ); -sub help { +sub cmd_help { my $self = shift; my ($subcmd) = @_; - $subcmd ||= 'all'; + $subcmd ||= 'help'; $subcmd = lc($subcmd); - my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help all'"; - warn "help returning: $txt\n"; - return $txt . "\n"; + if ($subcmd eq 'help') { + my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext)); + return "Available Commands:\n\n$txt\n"; + } + my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list."; + return "$txt\n"; } -sub status { +sub cmd_quit { my $self = shift; + $self->close; +} + +sub cmd_pause { + my $self = shift; + + my $other_fds = $self->OtherFds; + + $self->{other_fds} = { %$other_fds }; + %$other_fds = (); + return "PAUSED"; +} + +sub cmd_status { + my $self = shift; + +# Status should show: +# - Total time running +# - Total number of mails received +# - Total number of mails rejected (5xx) +# - Total number of mails tempfailed (5xx) +# - Avg number of mails/minute +# - Number of current connections +# - Number of outstanding DNS queries + + my $output = "Current Status as of " . gmtime() . " GMT\n\n"; + + if ($INC{'Qpsmtpd/Stats.pm'}) { + # Stats plugin is loaded + my $uptime = Qpsmtpd::Stats->uptime; + my $recvd = Qpsmtpd::Stats->mails_received; + my $reject = Qpsmtpd::Stats->mails_rejected; + my $soft = Qpsmtpd::Stats->mails_tempfailed; + my $rate = Qpsmtpd::Stats->mails_per_sec; + $output .= sprintf(" Uptime: %0.2f sec\n". + " Mails Received: % 10d\n". + " 5xx: % 10d\n". + " 4xx: % 10d\n". + "Mails per second: %0.2f\n", + $uptime, $recvd, $reject, $soft, $rate); + } my $descriptors = Danga::Socket->DescriptorMap; @@ -117,9 +173,99 @@ sub status { } } - return -" Current Connections: $current_connections - Current DNS Queries: $current_dns"; + $output .= "Curr Connections: $current_connections\n". + "Curr DNS Queries: $current_dns"; + + return $output; +} + +sub cmd_list { + my $self = shift; + my ($count) = @_; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n"; + my @all; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + next unless $pob->connection->remote_ip; # haven't even started yet + push @all, [$pob+0, $pob->connection->remote_ip, + $pob->connection->remote_host, $pob->uptime]; + } + } + + @all = sort { $a->[3] <=> $b->[3] } @all; + if ($count) { + if ($count > 0) { + @all = @all[$#all-($count-1) .. $#all]; + } + else { + @all = @all[0..(abs($count) - 1)]; + } + } + foreach my $item (@all) { + $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", @$item); + } + + return $list; +} + +sub cmd_kill { + my $self = shift; + my ($match) = @_; + + return "SYNTAX: KILL (\$IP | \$REF)\n" unless $match; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $killed = 0; + my $is_ip = (index($match, '.') >= 0); + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + if ($is_ip) { + next unless $pob->connection->remote_ip; # haven't even started yet + if ($pob->connection->remote_ip eq $match) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + else { + # match by ID + if ($pob+0 == hex($match)) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + } + } + + return "Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n"; +} + +sub cmd_dump { + my $self = shift; + my ($ref) = @_; + + return "SYNTAX: DUMP \$REF\n" unless $ref; + require Data::Dumper; + $Data::Dumper::Indent=1; + + my $descriptors = Danga::Socket->DescriptorMap; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + if ($pob+0 == hex($ref)) { + return Data::Dumper::Dumper($pob); + } + } + } + + return "Unable to find the connection: $ref. Try the LIST command\n"; } 1; diff --git a/lib/Qpsmtpd/PollServer.pm b/lib/Qpsmtpd/PollServer.pm index c205275..991d5f0 100644 --- a/lib/Qpsmtpd/PollServer.pm +++ b/lib/Qpsmtpd/PollServer.pm @@ -13,6 +13,7 @@ use fields qw( data_size max_size hooks + start_time _auth _commands _config_cache @@ -28,6 +29,7 @@ use Danga::DNS; use Mail::Header; use POSIX qw(strftime); use Socket qw(inet_aton AF_INET CRLF); +use Time::HiRes qw(time); use strict; sub input_sock { @@ -41,10 +43,17 @@ sub new { $self = fields::new($self) unless ref $self; $self->SUPER::new( @_ ); + $self->{start_time} = time; $self->load_plugins; return $self; } +sub uptime { + my Qpsmtpd::PollServer $self = shift; + + return (time() - $self->{start_time}); +} + sub reset_for_next_message { my $self = shift; $self->SUPER::reset_for_next_message(@_); diff --git a/lib/Qpsmtpd/Stats.pm b/lib/Qpsmtpd/Stats.pm new file mode 100644 index 0000000..a858b9f --- /dev/null +++ b/lib/Qpsmtpd/Stats.pm @@ -0,0 +1,35 @@ +# $Id$ + +package Qpsmtpd::Stats; + +use strict; +use Qpsmtpd; +use Qpsmtpd::Constants; +use Time::HiRes qw(time); + +my $START_TIME = time; +our $MAILS_RECEIVED = 0; +our $MAILS_REJECTED = 0; +our $MAILS_TEMPFAIL = 0; + +sub uptime { + return (time() - $START_TIME); +} + +sub mails_received { + return $MAILS_RECEIVED; +} + +sub mails_rejected { + return $MAILS_REJECTED; +} + +sub mails_tempfailed { + return $MAILS_TEMPFAIL; +} + +sub mails_per_sec { + return ($MAILS_RECEIVED / uptime()); +} + +1; \ No newline at end of file diff --git a/plugins/stats b/plugins/stats new file mode 100644 index 0000000..d7aa604 --- /dev/null +++ b/plugins/stats @@ -0,0 +1,31 @@ +#!/usr/bin/perl -w + +use Qpsmtpd::Stats; + +sub register { + my ($self) = @_; + + $self->register_hook('deny', 'increment_deny'); + $self->register_hook('queue', 'increment_mails'); +} + +sub increment_deny { + my ($self, $level) = @_; + + if ($level == DENY or $level == DENY_DISCONNECT) { + $Qpsmtpd::Stats::MAILS_REJECTED++; + } + elsif ($level == DENYSOFT or $level == DENYSOFT_DISCONNECT) { + $Qpsmtpd::Stats::MAILS_TEMPFAIL++; + } + + return DECLINED; +} + +sub increment_mails { + my $self = shift; + + $Qpsmtpd::Stats::MAILS_RECEIVED++; + + return DECLINED; +} \ No newline at end of file diff --git a/qpsmtpd b/qpsmtpd index 6f1df6d..928948e 100755 --- a/qpsmtpd +++ b/qpsmtpd @@ -43,6 +43,7 @@ my $PROCS = 1; my $MAXCONN = 15; # max simultaneous connections my $USER = 'smtpd'; # user to suid to my $MAXCONNIP = 5; # max simultaneous connections from one IP +my $PAUSED = 0; sub help { print <OtherFds(fileno($server) => \&accept_handler); + Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler); Qpsmtpd::PollServer->EventLoop(); exit; } @@ -199,7 +200,7 @@ sub run_as_inetd { sub run_as_server { # establish SERVER socket, bind and listen. - $server = IO::Socket::INET->new(LocalPort => $PORT, + $SERVER = IO::Socket::INET->new(LocalPort => $PORT, LocalAddr => $LOCALADDR, Type => SOCK_STREAM, Proto => IPPROTO_TCP, @@ -208,10 +209,10 @@ sub run_as_server { Listen => 10 ) or die "Error creating server $LOCALADDR:$PORT : $@\n"; - IO::Handle::blocking($server, 0); - binmode($server, ':raw'); + IO::Handle::blocking($SERVER, 0); + binmode($SERVER, ':raw'); - $config_server = IO::Socket::INET->new(LocalPort => $CONFIG_PORT, + $CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT, LocalAddr => $CONFIG_LOCALADDR, Type => SOCK_STREAM, Proto => IPPROTO_TCP, @@ -220,8 +221,8 @@ sub run_as_server { Listen => 1 ) or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n"; - IO::Handle::blocking($config_server, 0); - binmode($config_server, ':raw'); + IO::Handle::blocking($CONFIG_SERVER, 0); + binmode($CONFIG_SERVER, ':raw'); # Drop priviledges my (undef, undef, $quid, $qgid) = getpwnam $USER or @@ -258,8 +259,8 @@ sub run_as_server { } ::log(LOGDEBUG, "Listening on $PORT with single process $POLL" . ($LineMode ? " (forking server)" : "")); - Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler, - fileno($config_server) => \&config_handler, + Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler, + fileno($CONFIG_SERVER) => \&config_handler, ); while (1) { Qpsmtpd::PollServer->EventLoop(); @@ -270,7 +271,7 @@ sub run_as_server { } sub config_handler { - my $csock = $config_server->accept(); + my $csock = $CONFIG_SERVER->accept(); if (!$csock) { warn("accept failed on config server: $!"); return; @@ -295,7 +296,7 @@ sub accept_handler { return; } - my $csock = $server->accept(); + my $csock = $SERVER->accept(); if (!$csock) { # warn("accept() failed: $!"); return; @@ -313,6 +314,12 @@ sub accept_handler { my $client = Qpsmtpd::PollServer->new($csock); my $rem_ip = $client->peer_ip_string; + if ($PAUSED) { + $client->write("451 Sorry, this server is currently paused\r\n"); + $client->close; + return; + } + if ($MAXCONNIP) { my $num_conn = 1; # seed with current value @@ -370,7 +377,7 @@ sub accept_handler { return $csock->close(); } - $server->close(); # make sure the child doesn't accept() new connections + $SERVER->close(); # make sure the child doesn't accept() new connections $SIG{$_} = 'DEFAULT' for keys %SIG; @@ -406,3 +413,7 @@ sub log { warn("$$ $message\n"); } +sub pause { + my ($pause) = @_; + $PAUSED = $pause; +}