Added rudimentary configuration server when running in non-forking poll mode
git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@407 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
parent
ed4e06bcd2
commit
536e1723c1
@ -50,6 +50,12 @@ sub new {
|
|||||||
return $self;
|
return $self;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub pending {
|
||||||
|
my Danga::DNS::Resolver $self = shift;
|
||||||
|
|
||||||
|
return keys(%{$self->{id_to_asker}});
|
||||||
|
}
|
||||||
|
|
||||||
sub _query {
|
sub _query {
|
||||||
my Danga::DNS::Resolver $self = shift;
|
my Danga::DNS::Resolver $self = shift;
|
||||||
my ($asker, $host, $type, $now) = @_;
|
my ($asker, $host, $type, $now) = @_;
|
||||||
|
138
lib/Qpsmtpd/ConfigServer.pm
Normal file
138
lib/Qpsmtpd/ConfigServer.pm
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
# $Id$
|
||||||
|
|
||||||
|
package Qpsmtpd::ConfigServer;
|
||||||
|
|
||||||
|
use base ('Danga::Client');
|
||||||
|
|
||||||
|
use fields qw(
|
||||||
|
commands
|
||||||
|
_auth
|
||||||
|
_commands
|
||||||
|
_config_cache
|
||||||
|
_connection
|
||||||
|
_transaction
|
||||||
|
_test_mode
|
||||||
|
_extras
|
||||||
|
);
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my Qpsmtpd::ConfigServer $self = shift;
|
||||||
|
|
||||||
|
$self = fields::new($self) unless ref $self;
|
||||||
|
$self->SUPER::new( @_ );
|
||||||
|
$self->{commands} = { help => 1, status => 1, };
|
||||||
|
$self->write("Enter command:\n");
|
||||||
|
return $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub process_line {
|
||||||
|
my $self = shift;
|
||||||
|
my $line = shift || return;
|
||||||
|
if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; }
|
||||||
|
local $SIG{ALRM} = sub {
|
||||||
|
my ($pkg, $file, $line) = caller();
|
||||||
|
die "ALARM: $pkg, $file, $line";
|
||||||
|
};
|
||||||
|
my $prev = alarm(2); # must process a command in < 2 seconds
|
||||||
|
my $resp = eval { $self->_process_line($line) };
|
||||||
|
alarm($prev);
|
||||||
|
if ($@) {
|
||||||
|
print STDERR "Error: $@\n";
|
||||||
|
}
|
||||||
|
return $resp || '';
|
||||||
|
}
|
||||||
|
|
||||||
|
sub respond {
|
||||||
|
my $self = shift;
|
||||||
|
my (@messages) = @_;
|
||||||
|
while (my $msg = shift @messages) {
|
||||||
|
$self->write("$msg\r\n");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub fault {
|
||||||
|
my $self = shift;
|
||||||
|
my ($msg) = shift || "program fault - command not performed";
|
||||||
|
print STDERR "$0 [$$]: $msg ($!)\n";
|
||||||
|
return $self->respond("Error - " . $msg, "Enter command:");
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _process_line {
|
||||||
|
my $self = shift;
|
||||||
|
my $line = shift;
|
||||||
|
|
||||||
|
$line =~ s/\r?\n//;
|
||||||
|
my ($cmd, @params) = split(/ +/, $line);
|
||||||
|
my $meth = lc($cmd);
|
||||||
|
if (my $lookup = $self->{commands}->{$meth} && $self->can($meth)) {
|
||||||
|
my $resp = eval {
|
||||||
|
$lookup->($self, @params);
|
||||||
|
};
|
||||||
|
if ($@) {
|
||||||
|
my $error = $@;
|
||||||
|
chomp($error);
|
||||||
|
$self->log(LOGERROR, "Command Error: $error");
|
||||||
|
return $self->fault("command '$cmd' failed unexpectedly");
|
||||||
|
}
|
||||||
|
return $resp . "\nEnter command:\n";
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
# No such method - i.e. unrecognized command
|
||||||
|
return $self->fault("command '$cmd' unrecognised");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
my %helptext = (
|
||||||
|
all => "Available Commands:\n\nSTATUS\nHELP [CMD]",
|
||||||
|
status => "STATUS - Returns status information about current connections",
|
||||||
|
);
|
||||||
|
|
||||||
|
sub help {
|
||||||
|
my $self = shift;
|
||||||
|
my ($subcmd) = @_;
|
||||||
|
|
||||||
|
$subcmd ||= 'all';
|
||||||
|
$subcmd = lc($subcmd);
|
||||||
|
|
||||||
|
my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help all'";
|
||||||
|
warn "help returning: $txt\n";
|
||||||
|
return $txt . "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
sub status {
|
||||||
|
my $self = shift;
|
||||||
|
|
||||||
|
my $descriptors = Danga::Socket->DescriptorMap;
|
||||||
|
|
||||||
|
my $current_connections = 0;
|
||||||
|
my $current_dns = 0;
|
||||||
|
foreach my $fd (keys %$descriptors) {
|
||||||
|
my $pob = $descriptors->{$fd};
|
||||||
|
if ($pob->isa("Qpsmtpd::PollServer")) {
|
||||||
|
$current_connections++;
|
||||||
|
}
|
||||||
|
elsif ($pob->isa("Danga::DNS::Resolver")) {
|
||||||
|
$current_dns = $pob->pending;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
" Current Connections: $current_connections
|
||||||
|
Current DNS Queries: $current_dns";
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
|
__END__
|
||||||
|
|
||||||
|
=head1 NAME
|
||||||
|
|
||||||
|
Qpsmtpd::ConfigServer - a configuration server for qpsmtpd
|
||||||
|
|
||||||
|
=head1 DESCRIPTION
|
||||||
|
|
||||||
|
When qpsmtpd runs in multiplex mode it also provides a config server that you
|
||||||
|
can connect to. This allows you to view current connection statistics and other
|
||||||
|
gumph that you probably don't care about.
|
||||||
|
|
||||||
|
=cut
|
44
qpsmtpd
44
qpsmtpd
@ -15,6 +15,7 @@ use lib "$FindBin::Bin/lib";
|
|||||||
use Danga::Socket;
|
use Danga::Socket;
|
||||||
use Danga::Client;
|
use Danga::Client;
|
||||||
use Qpsmtpd::PollServer;
|
use Qpsmtpd::PollServer;
|
||||||
|
use Qpsmtpd::ConfigServer;
|
||||||
use Qpsmtpd::Constants;
|
use Qpsmtpd::Constants;
|
||||||
use IO::Socket;
|
use IO::Socket;
|
||||||
use Carp;
|
use Carp;
|
||||||
@ -31,6 +32,10 @@ use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
|
|||||||
$SIG{'PIPE'} = "IGNORE"; # handled manually
|
$SIG{'PIPE'} = "IGNORE"; # handled manually
|
||||||
|
|
||||||
$DEBUG = 0;
|
$DEBUG = 0;
|
||||||
|
|
||||||
|
my $CONFIG_PORT = 20025;
|
||||||
|
my $CONFIG_LOCALADDR = '127.0.0.1';
|
||||||
|
|
||||||
my $PORT = 2525;
|
my $PORT = 2525;
|
||||||
my $LOCALADDR = '0.0.0.0';
|
my $LOCALADDR = '0.0.0.0';
|
||||||
my $LineMode = 0;
|
my $LineMode = 0;
|
||||||
@ -93,6 +98,7 @@ my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" :
|
|||||||
$Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
|
$Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
|
||||||
|
|
||||||
my $server;
|
my $server;
|
||||||
|
my $config_server;
|
||||||
|
|
||||||
# Code for inetd/tcpserver mode
|
# Code for inetd/tcpserver mode
|
||||||
if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) {
|
if ($ENV{REMOTE_HOST} or $ENV{TCPREMOTEHOST}) {
|
||||||
@ -200,10 +206,22 @@ sub run_as_server {
|
|||||||
Blocking => 0,
|
Blocking => 0,
|
||||||
Reuse => 1,
|
Reuse => 1,
|
||||||
Listen => 10 )
|
Listen => 10 )
|
||||||
or die "Error creating server $LOCALADDR:$PORT : $@\n";
|
or die "Error creating server $LOCALADDR:$PORT : $@\n";
|
||||||
|
|
||||||
IO::Handle::blocking($server, 0);
|
IO::Handle::blocking($server, 0);
|
||||||
binmode($server, ':raw');
|
binmode($server, ':raw');
|
||||||
|
|
||||||
|
$config_server = IO::Socket::INET->new(LocalPort => $CONFIG_PORT,
|
||||||
|
LocalAddr => $CONFIG_LOCALADDR,
|
||||||
|
Type => SOCK_STREAM,
|
||||||
|
Proto => IPPROTO_TCP,
|
||||||
|
Blocking => 0,
|
||||||
|
Reuse => 1,
|
||||||
|
Listen => 1 )
|
||||||
|
or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n";
|
||||||
|
|
||||||
|
IO::Handle::blocking($config_server, 0);
|
||||||
|
binmode($config_server, ':raw');
|
||||||
|
|
||||||
# Drop priviledges
|
# Drop priviledges
|
||||||
my (undef, undef, $quid, $qgid) = getpwnam $USER or
|
my (undef, undef, $quid, $qgid) = getpwnam $USER or
|
||||||
@ -240,7 +258,9 @@ sub run_as_server {
|
|||||||
}
|
}
|
||||||
::log(LOGDEBUG, "Listening on $PORT with single process $POLL" .
|
::log(LOGDEBUG, "Listening on $PORT with single process $POLL" .
|
||||||
($LineMode ? " (forking server)" : ""));
|
($LineMode ? " (forking server)" : ""));
|
||||||
Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler);
|
Qpsmtpd::PollServer->OtherFds(fileno($server) => \&accept_handler,
|
||||||
|
fileno($config_server) => \&config_handler,
|
||||||
|
);
|
||||||
while (1) {
|
while (1) {
|
||||||
Qpsmtpd::PollServer->EventLoop();
|
Qpsmtpd::PollServer->EventLoop();
|
||||||
}
|
}
|
||||||
@ -249,6 +269,24 @@ sub run_as_server {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub config_handler {
|
||||||
|
my $csock = $config_server->accept();
|
||||||
|
if (!$csock) {
|
||||||
|
warn("accept failed on config server: $!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
binmode($csock, ':raw');
|
||||||
|
|
||||||
|
printf("Config server connection\n") if $DEBUG;
|
||||||
|
|
||||||
|
IO::Handle::blocking($csock, 0);
|
||||||
|
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
|
||||||
|
|
||||||
|
my $client = Qpsmtpd::ConfigServer->new($csock);
|
||||||
|
$client->watch_read(1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
# Accept a new connection
|
# Accept a new connection
|
||||||
sub accept_handler {
|
sub accept_handler {
|
||||||
my $running = scalar keys %childstatus;
|
my $running = scalar keys %childstatus;
|
||||||
@ -260,8 +298,8 @@ sub accept_handler {
|
|||||||
my $csock = $server->accept();
|
my $csock = $server->accept();
|
||||||
if (!$csock) {
|
if (!$csock) {
|
||||||
# warn("accept() failed: $!");
|
# warn("accept() failed: $!");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
return unless $csock;
|
|
||||||
binmode($csock, ':raw');
|
binmode($csock, ':raw');
|
||||||
|
|
||||||
printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock))
|
printf("Listen child making a Qpsmtpd::PollServer for %d.\n", fileno($csock))
|
||||||
|
Loading…
Reference in New Issue
Block a user