From 536e1723c12ba2533f9040a636486adebb204dff Mon Sep 17 00:00:00 2001 From: Matt Sergeant Date: Tue, 26 Apr 2005 02:46:45 +0000 Subject: [PATCH] Added rudimentary configuration server when running in non-forking poll mode git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@407 958fd67b-6ff1-0310-b445-bb7760255be9 --- lib/Danga/DNS/Resolver.pm | 6 ++ lib/Qpsmtpd/ConfigServer.pm | 138 ++++++++++++++++++++++++++++++++++++ qpsmtpd | 44 +++++++++++- 3 files changed, 185 insertions(+), 3 deletions(-) create mode 100644 lib/Qpsmtpd/ConfigServer.pm diff --git a/lib/Danga/DNS/Resolver.pm b/lib/Danga/DNS/Resolver.pm index ded6e37..9d7a9f5 100644 --- a/lib/Danga/DNS/Resolver.pm +++ b/lib/Danga/DNS/Resolver.pm @@ -50,6 +50,12 @@ sub new { return $self; } +sub pending { + my Danga::DNS::Resolver $self = shift; + + return keys(%{$self->{id_to_asker}}); +} + sub _query { my Danga::DNS::Resolver $self = shift; my ($asker, $host, $type, $now) = @_; diff --git a/lib/Qpsmtpd/ConfigServer.pm b/lib/Qpsmtpd/ConfigServer.pm new file mode 100644 index 0000000..edee148 --- /dev/null +++ b/lib/Qpsmtpd/ConfigServer.pm @@ -0,0 +1,138 @@ +# $Id$ + +package Qpsmtpd::ConfigServer; + +use base ('Danga::Client'); + +use fields qw( + commands + _auth + _commands + _config_cache + _connection + _transaction + _test_mode + _extras +); + +sub new { + my Qpsmtpd::ConfigServer $self = shift; + + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + $self->{commands} = { help => 1, status => 1, }; + $self->write("Enter command:\n"); + return $self; +} + +sub process_line { + my $self = shift; + my $line = shift || return; + if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } + local $SIG{ALRM} = sub { + my ($pkg, $file, $line) = caller(); + die "ALARM: $pkg, $file, $line"; + }; + my $prev = alarm(2); # must process a command in < 2 seconds + my $resp = eval { $self->_process_line($line) }; + alarm($prev); + if ($@) { + print STDERR "Error: $@\n"; + } + return $resp || ''; +} + +sub respond { + my $self = shift; + my (@messages) = @_; + while (my $msg = shift @messages) { + $self->write("$msg\r\n"); + } + return; +} + +sub fault { + my $self = shift; + my ($msg) = shift || "program fault - command not performed"; + print STDERR "$0 [$$]: $msg ($!)\n"; + return $self->respond("Error - " . $msg, "Enter command:"); +} + +sub _process_line { + my $self = shift; + my $line = shift; + + $line =~ s/\r?\n//; + my ($cmd, @params) = split(/ +/, $line); + my $meth = lc($cmd); + if (my $lookup = $self->{commands}->{$meth} && $self->can($meth)) { + my $resp = eval { + $lookup->($self, @params); + }; + if ($@) { + my $error = $@; + chomp($error); + $self->log(LOGERROR, "Command Error: $error"); + return $self->fault("command '$cmd' failed unexpectedly"); + } + return $resp . "\nEnter command:\n"; + } + else { + # No such method - i.e. unrecognized command + return $self->fault("command '$cmd' unrecognised"); + } +} + +my %helptext = ( + all => "Available Commands:\n\nSTATUS\nHELP [CMD]", + status => "STATUS - Returns status information about current connections", + ); + +sub help { + my $self = shift; + my ($subcmd) = @_; + + $subcmd ||= 'all'; + $subcmd = lc($subcmd); + + my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help all'"; + warn "help returning: $txt\n"; + return $txt . "\n"; +} + +sub status { + my $self = shift; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $current_connections = 0; + my $current_dns = 0; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + $current_connections++; + } + elsif ($pob->isa("Danga::DNS::Resolver")) { + $current_dns = $pob->pending; + } + } + + return +" Current Connections: $current_connections + Current DNS Queries: $current_dns"; +} + +1; +__END__ + +=head1 NAME + +Qpsmtpd::ConfigServer - a configuration server for qpsmtpd + +=head1 DESCRIPTION + +When qpsmtpd runs in multiplex mode it also provides a config server that you +can connect to. This allows you to view current connection statistics and other +gumph that you probably don't care about. + +=cut \ No newline at end of file diff --git a/qpsmtpd b/qpsmtpd index 673eb46..6f1df6d 100755 --- a/qpsmtpd +++ b/qpsmtpd @@ -15,6 +15,7 @@ use lib "$FindBin::Bin/lib"; use Danga::Socket; use Danga::Client; use Qpsmtpd::PollServer; +use Qpsmtpd::ConfigServer; use Qpsmtpd::Constants; use IO::Socket; use Carp; @@ -31,6 +32,10 @@ use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET); $SIG{'PIPE'} = "IGNORE"; # handled manually $DEBUG = 0; + +my $CONFIG_PORT = 20025; +my $CONFIG_LOCALADDR = '127.0.0.1'; + my $PORT = 2525; my $LOCALADDR = '0.0.0.0'; my $LineMode = 0; @@ -93,6 +98,7 @@ my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()"); my $server; +my $config_server; # Code for inetd/tcpserver mode if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) { @@ -200,10 +206,22 @@ sub run_as_server { Blocking => 0, Reuse => 1, Listen => 10 ) - or die "Error creating server $LOCALADDR:$PORT : $@\n"; + or die "Error creating server $LOCALADDR:$PORT : $@\n"; IO::Handle::blocking($server, 0); binmode($server, ':raw'); + + $config_server = IO::Socket::INET->new(LocalPort => $CONFIG_PORT, + LocalAddr => $CONFIG_LOCALADDR, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => 1 ) + or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n"; + + IO::Handle::blocking($config_server, 0); + binmode($config_server, ':raw'); # Drop priviledges my (undef, undef, $quid, $qgid) = getpwnam $USER or @@ -240,7 +258,9 @@ sub run_as_server { } ::log(LOGDEBUG, "Listening on $PORT with single process $POLL" . ($LineMode ? " (forking server)" : "")); - Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler); + Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler, + fileno($config_server) => \&config_handler, + ); while (1) { Qpsmtpd::PollServer->EventLoop(); } @@ -249,6 +269,24 @@ sub run_as_server { } +sub config_handler { + my $csock = $config_server->accept(); + if (!$csock) { + warn("accept failed on config server: $!"); + return; + } + binmode($csock, ':raw'); + + printf("Config server connection\n") if $DEBUG; + + IO::Handle::blocking($csock, 0); + setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + my $client = Qpsmtpd::ConfigServer->new($csock); + $client->watch_read(1); + return; +} + # Accept a new connection sub accept_handler { my $running = scalar keys %childstatus; @@ -260,8 +298,8 @@ sub accept_handler { my $csock = $server->accept(); if (!$csock) { # warn("accept() failed: $!"); + return; } - return unless $csock; binmode($csock, ':raw'); printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock))