More qpsmtpd-async merges
git-svn-id: https://svn.perl.org/qpsmtpd/branches/0.3x@707 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
commit
30a1c71efc
173
lib/Danga/Client.pm
Normal file
173
lib/Danga/Client.pm
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $
|
||||||
|
|
||||||
|
package Danga::Client;
|
||||||
|
use base 'Danga::TimeoutSocket';
|
||||||
|
use fields qw(line pause_count read_bytes data_bytes callback get_chunks);
|
||||||
|
use Time::HiRes ();
|
||||||
|
|
||||||
|
use bytes;
|
||||||
|
|
||||||
|
# 30 seconds max timeout!
|
||||||
|
sub max_idle_time { 30 }
|
||||||
|
sub max_connect_time { 1200 }
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
$self = fields::new($self) unless ref $self;
|
||||||
|
$self->SUPER::new( @_ );
|
||||||
|
|
||||||
|
$self->reset_for_next_message;
|
||||||
|
return $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub reset_for_next_message {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
$self->{line} = '';
|
||||||
|
$self->{pause_count} = 0;
|
||||||
|
$self->{read_bytes} = 0;
|
||||||
|
$self->{callback} = undef;
|
||||||
|
$self->{data_bytes} = '';
|
||||||
|
$self->{get_chunks} = 0;
|
||||||
|
return $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub get_bytes {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
my ($bytes, $callback) = @_;
|
||||||
|
if ($self->{callback}) {
|
||||||
|
die "get_bytes/get_chunks currently in progress!";
|
||||||
|
}
|
||||||
|
$self->{read_bytes} = $bytes;
|
||||||
|
$self->{data_bytes} = $self->{line};
|
||||||
|
$self->{read_bytes} -= length($self->{data_bytes});
|
||||||
|
$self->{line} = '';
|
||||||
|
if ($self->{read_bytes} <= 0) {
|
||||||
|
if ($self->{read_bytes} < 0) {
|
||||||
|
$self->{line} = substr($self->{data_bytes},
|
||||||
|
$self->{read_bytes}, # negative offset
|
||||||
|
0 - $self->{read_bytes}, # to end of str
|
||||||
|
""); # truncate that substr
|
||||||
|
}
|
||||||
|
$callback->($self->{data_bytes});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$self->{callback} = $callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub get_chunks {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
my ($bytes, $callback) = @_;
|
||||||
|
if ($self->{callback}) {
|
||||||
|
die "get_bytes/get_chunks currently in progress!";
|
||||||
|
}
|
||||||
|
$self->{read_bytes} = $bytes;
|
||||||
|
$callback->($self->{line}) if length($self->{line});
|
||||||
|
$self->{line} = '';
|
||||||
|
$self->{callback} = $callback;
|
||||||
|
$self->{get_chunks} = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub end_get_chunks {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
my $remaining = shift;
|
||||||
|
$self->{callback} = undef;
|
||||||
|
$self->{get_chunks} = 0;
|
||||||
|
if (defined($remaining)) {
|
||||||
|
$self->process_read_buf(\$remaining);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub event_read {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
if ($self->{callback}) {
|
||||||
|
$self->{alive_time} = time;
|
||||||
|
if ($self->{get_chunks}) {
|
||||||
|
my $bref = $self->read($self->{read_bytes});
|
||||||
|
return $self->close($!) unless defined $bref;
|
||||||
|
$self->{callback}->($$bref) if length($$bref);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ($self->{read_bytes} > 0) {
|
||||||
|
my $bref = $self->read($self->{read_bytes});
|
||||||
|
return $self->close($!) unless defined $bref;
|
||||||
|
$self->{read_bytes} -= length($$bref);
|
||||||
|
$self->{data_bytes} .= $$bref;
|
||||||
|
}
|
||||||
|
if ($self->{read_bytes} <= 0) {
|
||||||
|
# print "Erk, read too much!\n" if $self->{read_bytes} < 0;
|
||||||
|
my $cb = $self->{callback};
|
||||||
|
$self->{callback} = undef;
|
||||||
|
$cb->($self->{data_bytes});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
my $bref = $self->read(8192);
|
||||||
|
return $self->close($!) unless defined $bref;
|
||||||
|
$self->process_read_buf($bref);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub process_read_buf {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
my $bref = shift;
|
||||||
|
$self->{line} .= $$bref;
|
||||||
|
return if $self->{pause_count} || $self->{closed};
|
||||||
|
|
||||||
|
while ($self->{line} =~ s/^(.*?\n)//) {
|
||||||
|
my $line = $1;
|
||||||
|
$self->{alive_time} = time;
|
||||||
|
my $resp = $self->process_line($line);
|
||||||
|
if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) }
|
||||||
|
$self->write($resp) if $resp;
|
||||||
|
# $self->watch_read(0) if $self->{pause_count};
|
||||||
|
return if $self->{pause_count} || $self->{closed};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub has_data {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
return length($self->{line}) ? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub clear_data {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
$self->{line} = '';
|
||||||
|
}
|
||||||
|
|
||||||
|
sub paused {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
return 1 if $self->{pause_count};
|
||||||
|
return 1 if $self->{closed};
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub pause_read {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
$self->{pause_count}++;
|
||||||
|
# $self->watch_read(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
sub continue_read {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
$self->{pause_count}--;
|
||||||
|
if ($self->{pause_count} <= 0) {
|
||||||
|
$self->{pause_count} = 0;
|
||||||
|
# $self->watch_read(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub process_line {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
||||||
|
sub close {
|
||||||
|
my Danga::Client $self = shift;
|
||||||
|
print "closing @_\n" if $::DEBUG;
|
||||||
|
$self->SUPER::close(@_);
|
||||||
|
}
|
||||||
|
|
||||||
|
sub event_err { my Danga::Client $self = shift; $self->close("Error") }
|
||||||
|
sub event_hup { my Danga::Client $self = shift; $self->close("Disconnect (HUP)") }
|
||||||
|
|
||||||
|
1;
|
62
lib/Danga/TimeoutSocket.pm
Normal file
62
lib/Danga/TimeoutSocket.pm
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
# $Id: TimeoutSocket.pm,v 1.2 2005/02/02 20:44:35 msergeant Exp $
|
||||||
|
|
||||||
|
package Danga::TimeoutSocket;
|
||||||
|
|
||||||
|
use base 'Danga::Socket';
|
||||||
|
use fields qw(alive_time create_time);
|
||||||
|
|
||||||
|
our $last_cleanup = 0;
|
||||||
|
|
||||||
|
Danga::Socket->AddTimer(15, \&_do_cleanup);
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my Danga::TimeoutSocket $self = shift;
|
||||||
|
my $sock = shift;
|
||||||
|
$self = fields::new($self) unless ref($self);
|
||||||
|
$self->SUPER::new($sock);
|
||||||
|
|
||||||
|
my $now = time;
|
||||||
|
$self->{alive_time} = $self->{create_time} = $now;
|
||||||
|
|
||||||
|
return $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
# overload these in a subclass
|
||||||
|
sub max_idle_time { 0 }
|
||||||
|
sub max_connect_time { 0 }
|
||||||
|
|
||||||
|
sub _do_cleanup {
|
||||||
|
my $now = time;
|
||||||
|
|
||||||
|
Danga::Socket->AddTimer(15, \&_do_cleanup);
|
||||||
|
|
||||||
|
my $sf = __PACKAGE__->get_sock_ref;
|
||||||
|
|
||||||
|
my %max_age; # classname -> max age (0 means forever)
|
||||||
|
my %max_connect; # classname -> max connect time
|
||||||
|
my @to_close;
|
||||||
|
while (my $k = each %$sf) {
|
||||||
|
my Danga::TimeoutSocket $v = $sf->{$k};
|
||||||
|
my $ref = ref $v;
|
||||||
|
next unless $v->isa('Danga::TimeoutSocket');
|
||||||
|
unless (defined $max_age{$ref}) {
|
||||||
|
$max_age{$ref} = $ref->max_idle_time || 0;
|
||||||
|
$max_connect{$ref} = $ref->max_connect_time || 0;
|
||||||
|
}
|
||||||
|
if (my $t = $max_connect{$ref}) {
|
||||||
|
if ($v->{create_time} < $now - $t) {
|
||||||
|
push @to_close, $v;
|
||||||
|
next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (my $t = $max_age{$ref}) {
|
||||||
|
if ($v->{alive_time} < $now - $t) {
|
||||||
|
push @to_close, $v;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$_->close("Timeout") foreach @to_close;
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
289
lib/Qpsmtpd/ConfigServer.pm
Normal file
289
lib/Qpsmtpd/ConfigServer.pm
Normal file
@ -0,0 +1,289 @@
|
|||||||
|
# $Id$
|
||||||
|
|
||||||
|
package Qpsmtpd::ConfigServer;
|
||||||
|
|
||||||
|
use base ('Danga::Client');
|
||||||
|
use Qpsmtpd::Constants;
|
||||||
|
|
||||||
|
use strict;
|
||||||
|
|
||||||
|
use fields qw(
|
||||||
|
_auth
|
||||||
|
_commands
|
||||||
|
_config_cache
|
||||||
|
_connection
|
||||||
|
_transaction
|
||||||
|
_test_mode
|
||||||
|
_extras
|
||||||
|
other_fds
|
||||||
|
);
|
||||||
|
|
||||||
|
my $PROMPT = "Enter command: ";
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my Qpsmtpd::ConfigServer $self = shift;
|
||||||
|
|
||||||
|
$self = fields::new($self) unless ref $self;
|
||||||
|
$self->SUPER::new( @_ );
|
||||||
|
$self->write($PROMPT);
|
||||||
|
return $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub max_idle_time { 3600 } # one hour
|
||||||
|
|
||||||
|
sub process_line {
|
||||||
|
my $self = shift;
|
||||||
|
my $line = shift || return;
|
||||||
|
if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; }
|
||||||
|
local $SIG{ALRM} = sub {
|
||||||
|
my ($pkg, $file, $line) = caller();
|
||||||
|
die "ALARM: $pkg, $file, $line";
|
||||||
|
};
|
||||||
|
my $prev = alarm(2); # must process a command in < 2 seconds
|
||||||
|
my $resp = eval { $self->_process_line($line) };
|
||||||
|
alarm($prev);
|
||||||
|
if ($@) {
|
||||||
|
print STDERR "Error: $@\n";
|
||||||
|
}
|
||||||
|
return $resp || '';
|
||||||
|
}
|
||||||
|
|
||||||
|
sub respond {
|
||||||
|
my $self = shift;
|
||||||
|
my (@messages) = @_;
|
||||||
|
while (my $msg = shift @messages) {
|
||||||
|
$self->write("$msg\r\n");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub fault {
|
||||||
|
my $self = shift;
|
||||||
|
my ($msg) = shift || "program fault - command not performed";
|
||||||
|
print STDERR "$0 [$$]: $msg ($!)\n";
|
||||||
|
$self->respond("Error - " . $msg);
|
||||||
|
return $PROMPT;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _process_line {
|
||||||
|
my $self = shift;
|
||||||
|
my $line = shift;
|
||||||
|
|
||||||
|
$line =~ s/\r?\n//;
|
||||||
|
my ($cmd, @params) = split(/ +/, $line);
|
||||||
|
my $meth = "cmd_" . lc($cmd);
|
||||||
|
if (my $lookup = $self->can($meth)) {
|
||||||
|
my $resp = eval {
|
||||||
|
$lookup->($self, @params);
|
||||||
|
};
|
||||||
|
if ($@) {
|
||||||
|
my $error = $@;
|
||||||
|
chomp($error);
|
||||||
|
Qpsmtpd->log(LOGERROR, "Command Error: $error");
|
||||||
|
return $self->fault("command '$cmd' failed unexpectedly");
|
||||||
|
}
|
||||||
|
return "$resp\n$PROMPT";
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
# No such method - i.e. unrecognized command
|
||||||
|
return $self->fault("command '$cmd' unrecognised");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
my %helptext = (
|
||||||
|
help => "HELP [CMD] - Get help on all commands or a specific command",
|
||||||
|
status => "STATUS - Returns status information about current connections",
|
||||||
|
list => "LIST [LIMIT] - List the connections, specify limit or negative limit to shrink list",
|
||||||
|
kill => "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF",
|
||||||
|
pause => "PAUSE - Stop accepting new connections",
|
||||||
|
continue => "CONTINUE - Resume accepting connections",
|
||||||
|
reload => "RELOAD - Reload all plugins and config",
|
||||||
|
quit => "QUIT - Exit the config server",
|
||||||
|
);
|
||||||
|
|
||||||
|
sub cmd_help {
|
||||||
|
my $self = shift;
|
||||||
|
my ($subcmd) = @_;
|
||||||
|
|
||||||
|
$subcmd ||= 'help';
|
||||||
|
$subcmd = lc($subcmd);
|
||||||
|
|
||||||
|
if ($subcmd eq 'help') {
|
||||||
|
my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext));
|
||||||
|
return "Available Commands:\n\n$txt\n";
|
||||||
|
}
|
||||||
|
my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list.";
|
||||||
|
return "$txt\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_quit {
|
||||||
|
my $self = shift;
|
||||||
|
$self->close;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_shutdown {
|
||||||
|
exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_pause {
|
||||||
|
my $self = shift;
|
||||||
|
|
||||||
|
my $other_fds = $self->OtherFds;
|
||||||
|
|
||||||
|
$self->{other_fds} = { %$other_fds };
|
||||||
|
%$other_fds = ();
|
||||||
|
return "PAUSED";
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_continue {
|
||||||
|
my $self = shift;
|
||||||
|
|
||||||
|
my $other_fds = $self->{other_fds};
|
||||||
|
|
||||||
|
$self->OtherFds( %$other_fds );
|
||||||
|
%$other_fds = ();
|
||||||
|
return "UNPAUSED";
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_status {
|
||||||
|
my $self = shift;
|
||||||
|
|
||||||
|
# Status should show:
|
||||||
|
# - Total time running
|
||||||
|
# - Total number of mails received
|
||||||
|
# - Total number of mails rejected (5xx)
|
||||||
|
# - Total number of mails tempfailed (5xx)
|
||||||
|
# - Avg number of mails/minute
|
||||||
|
# - Number of current connections
|
||||||
|
# - Number of outstanding DNS queries
|
||||||
|
|
||||||
|
my $output = "Current Status as of " . gmtime() . " GMT\n\n";
|
||||||
|
|
||||||
|
if (defined &Qpsmtpd::Plugin::stats::get_stats) {
|
||||||
|
# Stats plugin is loaded
|
||||||
|
$output .= Qpsmtpd::Plugin::stats->get_stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
my $descriptors = Danga::Socket->DescriptorMap;
|
||||||
|
|
||||||
|
my $current_connections = 0;
|
||||||
|
my $current_dns = 0;
|
||||||
|
foreach my $fd (keys %$descriptors) {
|
||||||
|
my $pob = $descriptors->{$fd};
|
||||||
|
if ($pob->isa("Qpsmtpd::PollServer")) {
|
||||||
|
$current_connections++;
|
||||||
|
}
|
||||||
|
elsif ($pob->isa("ParaDNS::Resolver")) {
|
||||||
|
$current_dns = $pob->pending;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$output .= "Curr Connections: $current_connections / $::MAXconn\n".
|
||||||
|
"Curr DNS Queries: $current_dns";
|
||||||
|
|
||||||
|
return $output;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_list {
|
||||||
|
my $self = shift;
|
||||||
|
my ($count) = @_;
|
||||||
|
|
||||||
|
my $descriptors = Danga::Socket->DescriptorMap;
|
||||||
|
|
||||||
|
my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n";
|
||||||
|
my @all;
|
||||||
|
foreach my $fd (keys %$descriptors) {
|
||||||
|
my $pob = $descriptors->{$fd};
|
||||||
|
if ($pob->isa("Qpsmtpd::PollServer")) {
|
||||||
|
next unless $pob->connection->remote_ip; # haven't even started yet
|
||||||
|
push @all, [$pob+0, $pob->connection->remote_ip,
|
||||||
|
$pob->connection->remote_host, $pob->uptime];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@all = sort { $a->[3] <=> $b->[3] } @all;
|
||||||
|
if ($count) {
|
||||||
|
if ($count > 0) {
|
||||||
|
@all = @all[$#all-($count-1) .. $#all];
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
@all = @all[0..(abs($count) - 1)];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
foreach my $item (@all) {
|
||||||
|
$list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $list;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_kill {
|
||||||
|
my $self = shift;
|
||||||
|
my ($match) = @_;
|
||||||
|
|
||||||
|
return "SYNTAX: KILL (\$IP | \$REF)\n" unless $match;
|
||||||
|
|
||||||
|
my $descriptors = Danga::Socket->DescriptorMap;
|
||||||
|
|
||||||
|
my $killed = 0;
|
||||||
|
my $is_ip = (index($match, '.') >= 0);
|
||||||
|
foreach my $fd (keys %$descriptors) {
|
||||||
|
my $pob = $descriptors->{$fd};
|
||||||
|
if ($pob->isa("Qpsmtpd::PollServer")) {
|
||||||
|
if ($is_ip) {
|
||||||
|
next unless $pob->connection->remote_ip; # haven't even started yet
|
||||||
|
if ($pob->connection->remote_ip eq $match) {
|
||||||
|
$pob->write("550 Your connection has been killed by an administrator\r\n");
|
||||||
|
$pob->disconnect;
|
||||||
|
$killed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
# match by ID
|
||||||
|
if ($pob+0 == hex($match)) {
|
||||||
|
$pob->write("550 Your connection has been killed by an administrator\r\n");
|
||||||
|
$pob->disconnect;
|
||||||
|
$killed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return "Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cmd_dump {
|
||||||
|
my $self = shift;
|
||||||
|
my ($ref) = @_;
|
||||||
|
|
||||||
|
return "SYNTAX: DUMP \$REF\n" unless $ref;
|
||||||
|
require Data::Dumper;
|
||||||
|
$Data::Dumper::Indent=1;
|
||||||
|
|
||||||
|
my $descriptors = Danga::Socket->DescriptorMap;
|
||||||
|
foreach my $fd (keys %$descriptors) {
|
||||||
|
my $pob = $descriptors->{$fd};
|
||||||
|
if ($pob->isa("Qpsmtpd::PollServer")) {
|
||||||
|
if ($pob+0 == hex($ref)) {
|
||||||
|
return Data::Dumper::Dumper($pob);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return "Unable to find the connection: $ref. Try the LIST command\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
|
__END__
|
||||||
|
|
||||||
|
=head1 NAME
|
||||||
|
|
||||||
|
Qpsmtpd::ConfigServer - a configuration server for qpsmtpd
|
||||||
|
|
||||||
|
=head1 DESCRIPTION
|
||||||
|
|
||||||
|
When qpsmtpd runs in multiplex mode it also provides a config server that you
|
||||||
|
can connect to. This allows you to view current connection statistics and other
|
||||||
|
gumph that you probably don't care about.
|
||||||
|
|
||||||
|
=cut
|
Loading…
Reference in New Issue
Block a user