diff --git a/lib/Danga/Client.pm b/lib/Danga/Client.pm new file mode 100644 index 0000000..d407f20 --- /dev/null +++ b/lib/Danga/Client.pm @@ -0,0 +1,173 @@ +# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $ + +package Danga::Client; +use base 'Danga::TimeoutSocket'; +use fields qw(line pause_count read_bytes data_bytes callback get_chunks); +use Time::HiRes (); + +use bytes; + +# 30 seconds max timeout! +sub max_idle_time { 30 } +sub max_connect_time { 1200 } + +sub new { + my Danga::Client $self = shift; + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + + $self->reset_for_next_message; + return $self; +} + +sub reset_for_next_message { + my Danga::Client $self = shift; + $self->{line} = ''; + $self->{pause_count} = 0; + $self->{read_bytes} = 0; + $self->{callback} = undef; + $self->{data_bytes} = ''; + $self->{get_chunks} = 0; + return $self; +} + +sub get_bytes { + my Danga::Client $self = shift; + my ($bytes, $callback) = @_; + if ($self->{callback}) { + die "get_bytes/get_chunks currently in progress!"; + } + $self->{read_bytes} = $bytes; + $self->{data_bytes} = $self->{line}; + $self->{read_bytes} -= length($self->{data_bytes}); + $self->{line} = ''; + if ($self->{read_bytes} <= 0) { + if ($self->{read_bytes} < 0) { + $self->{line} = substr($self->{data_bytes}, + $self->{read_bytes}, # negative offset + 0 - $self->{read_bytes}, # to end of str + ""); # truncate that substr + } + $callback->($self->{data_bytes}); + return; + } + $self->{callback} = $callback; +} + +sub get_chunks { + my Danga::Client $self = shift; + my ($bytes, $callback) = @_; + if ($self->{callback}) { + die "get_bytes/get_chunks currently in progress!"; + } + $self->{read_bytes} = $bytes; + $callback->($self->{line}) if length($self->{line}); + $self->{line} = ''; + $self->{callback} = $callback; + $self->{get_chunks} = 1; +} + +sub end_get_chunks { + my Danga::Client $self = shift; + my $remaining = shift; + $self->{callback} = undef; + $self->{get_chunks} = 0; + if (defined($remaining)) { + $self->process_read_buf(\$remaining); + } +} + +sub event_read { + my Danga::Client $self = shift; + if ($self->{callback}) { + $self->{alive_time} = time; + if ($self->{get_chunks}) { + my $bref = $self->read($self->{read_bytes}); + return $self->close($!) unless defined $bref; + $self->{callback}->($$bref) if length($$bref); + return; + } + if ($self->{read_bytes} > 0) { + my $bref = $self->read($self->{read_bytes}); + return $self->close($!) unless defined $bref; + $self->{read_bytes} -= length($$bref); + $self->{data_bytes} .= $$bref; + } + if ($self->{read_bytes} <= 0) { + # print "Erk, read too much!\n" if $self->{read_bytes} < 0; + my $cb = $self->{callback}; + $self->{callback} = undef; + $cb->($self->{data_bytes}); + } + } + else { + my $bref = $self->read(8192); + return $self->close($!) unless defined $bref; + $self->process_read_buf($bref); + } +} + +sub process_read_buf { + my Danga::Client $self = shift; + my $bref = shift; + $self->{line} .= $$bref; + return if $self->{pause_count} || $self->{closed}; + + while ($self->{line} =~ s/^(.*?\n)//) { + my $line = $1; + $self->{alive_time} = time; + my $resp = $self->process_line($line); + if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) } + $self->write($resp) if $resp; + # $self->watch_read(0) if $self->{pause_count}; + return if $self->{pause_count} || $self->{closed}; + } +} + +sub has_data { + my Danga::Client $self = shift; + return length($self->{line}) ? 1 : 0; +} + +sub clear_data { + my Danga::Client $self = shift; + $self->{line} = ''; +} + +sub paused { + my Danga::Client $self = shift; + return 1 if $self->{pause_count}; + return 1 if $self->{closed}; + return 0; +} + +sub pause_read { + my Danga::Client $self = shift; + $self->{pause_count}++; + # $self->watch_read(0); +} + +sub continue_read { + my Danga::Client $self = shift; + $self->{pause_count}--; + if ($self->{pause_count} <= 0) { + $self->{pause_count} = 0; + # $self->watch_read(1); + } +} + +sub process_line { + my Danga::Client $self = shift; + return ''; +} + +sub close { + my Danga::Client $self = shift; + print "closing @_\n" if $::DEBUG; + $self->SUPER::close(@_); +} + +sub event_err { my Danga::Client $self = shift; $self->close("Error") } +sub event_hup { my Danga::Client $self = shift; $self->close("Disconnect (HUP)") } + +1; diff --git a/lib/Danga/TimeoutSocket.pm b/lib/Danga/TimeoutSocket.pm new file mode 100644 index 0000000..d977570 --- /dev/null +++ b/lib/Danga/TimeoutSocket.pm @@ -0,0 +1,62 @@ +# $Id: TimeoutSocket.pm,v 1.2 2005/02/02 20:44:35 msergeant Exp $ + +package Danga::TimeoutSocket; + +use base 'Danga::Socket'; +use fields qw(alive_time create_time); + +our $last_cleanup = 0; + +Danga::Socket->AddTimer(15, \&_do_cleanup); + +sub new { + my Danga::TimeoutSocket $self = shift; + my $sock = shift; + $self = fields::new($self) unless ref($self); + $self->SUPER::new($sock); + + my $now = time; + $self->{alive_time} = $self->{create_time} = $now; + + return $self; +} + +# overload these in a subclass +sub max_idle_time { 0 } +sub max_connect_time { 0 } + +sub _do_cleanup { + my $now = time; + + Danga::Socket->AddTimer(15, \&_do_cleanup); + + my $sf = __PACKAGE__->get_sock_ref; + + my %max_age; # classname -> max age (0 means forever) + my %max_connect; # classname -> max connect time + my @to_close; + while (my $k = each %$sf) { + my Danga::TimeoutSocket $v = $sf->{$k}; + my $ref = ref $v; + next unless $v->isa('Danga::TimeoutSocket'); + unless (defined $max_age{$ref}) { + $max_age{$ref} = $ref->max_idle_time || 0; + $max_connect{$ref} = $ref->max_connect_time || 0; + } + if (my $t = $max_connect{$ref}) { + if ($v->{create_time} < $now - $t) { + push @to_close, $v; + next; + } + } + if (my $t = $max_age{$ref}) { + if ($v->{alive_time} < $now - $t) { + push @to_close, $v; + } + } + } + + $_->close("Timeout") foreach @to_close; +} + +1; diff --git a/lib/Qpsmtpd/ConfigServer.pm b/lib/Qpsmtpd/ConfigServer.pm new file mode 100644 index 0000000..5d870c5 --- /dev/null +++ b/lib/Qpsmtpd/ConfigServer.pm @@ -0,0 +1,289 @@ +# $Id$ + +package Qpsmtpd::ConfigServer; + +use base ('Danga::Client'); +use Qpsmtpd::Constants; + +use strict; + +use fields qw( + _auth + _commands + _config_cache + _connection + _transaction + _test_mode + _extras + other_fds +); + +my $PROMPT = "Enter command: "; + +sub new { + my Qpsmtpd::ConfigServer $self = shift; + + $self = fields::new($self) unless ref $self; + $self->SUPER::new( @_ ); + $self->write($PROMPT); + return $self; +} + +sub max_idle_time { 3600 } # one hour + +sub process_line { + my $self = shift; + my $line = shift || return; + if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; } + local $SIG{ALRM} = sub { + my ($pkg, $file, $line) = caller(); + die "ALARM: $pkg, $file, $line"; + }; + my $prev = alarm(2); # must process a command in < 2 seconds + my $resp = eval { $self->_process_line($line) }; + alarm($prev); + if ($@) { + print STDERR "Error: $@\n"; + } + return $resp || ''; +} + +sub respond { + my $self = shift; + my (@messages) = @_; + while (my $msg = shift @messages) { + $self->write("$msg\r\n"); + } + return; +} + +sub fault { + my $self = shift; + my ($msg) = shift || "program fault - command not performed"; + print STDERR "$0 [$$]: $msg ($!)\n"; + $self->respond("Error - " . $msg); + return $PROMPT; +} + +sub _process_line { + my $self = shift; + my $line = shift; + + $line =~ s/\r?\n//; + my ($cmd, @params) = split(/ +/, $line); + my $meth = "cmd_" . lc($cmd); + if (my $lookup = $self->can($meth)) { + my $resp = eval { + $lookup->($self, @params); + }; + if ($@) { + my $error = $@; + chomp($error); + Qpsmtpd->log(LOGERROR, "Command Error: $error"); + return $self->fault("command '$cmd' failed unexpectedly"); + } + return "$resp\n$PROMPT"; + } + else { + # No such method - i.e. unrecognized command + return $self->fault("command '$cmd' unrecognised"); + } +} + +my %helptext = ( + help => "HELP [CMD] - Get help on all commands or a specific command", + status => "STATUS - Returns status information about current connections", + list => "LIST [LIMIT] - List the connections, specify limit or negative limit to shrink list", + kill => "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF", + pause => "PAUSE - Stop accepting new connections", + continue => "CONTINUE - Resume accepting connections", + reload => "RELOAD - Reload all plugins and config", + quit => "QUIT - Exit the config server", + ); + +sub cmd_help { + my $self = shift; + my ($subcmd) = @_; + + $subcmd ||= 'help'; + $subcmd = lc($subcmd); + + if ($subcmd eq 'help') { + my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext)); + return "Available Commands:\n\n$txt\n"; + } + my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list."; + return "$txt\n"; +} + +sub cmd_quit { + my $self = shift; + $self->close; +} + +sub cmd_shutdown { + exit; +} + +sub cmd_pause { + my $self = shift; + + my $other_fds = $self->OtherFds; + + $self->{other_fds} = { %$other_fds }; + %$other_fds = (); + return "PAUSED"; +} + +sub cmd_continue { + my $self = shift; + + my $other_fds = $self->{other_fds}; + + $self->OtherFds( %$other_fds ); + %$other_fds = (); + return "UNPAUSED"; +} + +sub cmd_status { + my $self = shift; + +# Status should show: +# - Total time running +# - Total number of mails received +# - Total number of mails rejected (5xx) +# - Total number of mails tempfailed (5xx) +# - Avg number of mails/minute +# - Number of current connections +# - Number of outstanding DNS queries + + my $output = "Current Status as of " . gmtime() . " GMT\n\n"; + + if (defined &Qpsmtpd::Plugin::stats::get_stats) { + # Stats plugin is loaded + $output .= Qpsmtpd::Plugin::stats->get_stats; + } + + my $descriptors = Danga::Socket->DescriptorMap; + + my $current_connections = 0; + my $current_dns = 0; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + $current_connections++; + } + elsif ($pob->isa("ParaDNS::Resolver")) { + $current_dns = $pob->pending; + } + } + + $output .= "Curr Connections: $current_connections / $::MAXconn\n". + "Curr DNS Queries: $current_dns"; + + return $output; +} + +sub cmd_list { + my $self = shift; + my ($count) = @_; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n"; + my @all; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + next unless $pob->connection->remote_ip; # haven't even started yet + push @all, [$pob+0, $pob->connection->remote_ip, + $pob->connection->remote_host, $pob->uptime]; + } + } + + @all = sort { $a->[3] <=> $b->[3] } @all; + if ($count) { + if ($count > 0) { + @all = @all[$#all-($count-1) .. $#all]; + } + else { + @all = @all[0..(abs($count) - 1)]; + } + } + foreach my $item (@all) { + $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item); + } + + return $list; +} + +sub cmd_kill { + my $self = shift; + my ($match) = @_; + + return "SYNTAX: KILL (\$IP | \$REF)\n" unless $match; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $killed = 0; + my $is_ip = (index($match, '.') >= 0); + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + if ($is_ip) { + next unless $pob->connection->remote_ip; # haven't even started yet + if ($pob->connection->remote_ip eq $match) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + else { + # match by ID + if ($pob+0 == hex($match)) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + } + } + + return "Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n"; +} + +sub cmd_dump { + my $self = shift; + my ($ref) = @_; + + return "SYNTAX: DUMP \$REF\n" unless $ref; + require Data::Dumper; + $Data::Dumper::Indent=1; + + my $descriptors = Danga::Socket->DescriptorMap; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + if ($pob+0 == hex($ref)) { + return Data::Dumper::Dumper($pob); + } + } + } + + return "Unable to find the connection: $ref. Try the LIST command\n"; +} + +1; +__END__ + +=head1 NAME + +Qpsmtpd::ConfigServer - a configuration server for qpsmtpd + +=head1 DESCRIPTION + +When qpsmtpd runs in multiplex mode it also provides a config server that you +can connect to. This allows you to view current connection statistics and other +gumph that you probably don't care about. + +=cut