Merge pull request #114 from msimerson/async

async removed
This commit is contained in:
Jared Johnson 2014-09-17 14:02:51 -05:00
commit f4ab59b613
32 changed files with 45 additions and 2685 deletions

View File

@ -68,13 +68,6 @@ Makefile.PL
MANIFEST This list of files
MANIFEST.SKIP
META.yml Module meta-data (added by MakeMaker)
plugins/async/dns_whitelist_soft
plugins/async/dnsbl
plugins/async/earlytalker
plugins/async/queue/smtp-forward
plugins/async/resolvable_fromhost
plugins/async/rhsbl
plugins/async/uribl
plugins/auth/auth_checkpassword
plugins/auth/auth_cvm_unix_local
plugins/auth/auth_flat_file
@ -156,7 +149,6 @@ plugins/virus/sophie
plugins/virus/uvscan
plugins/whitelist
qpsmtpd
qpsmtpd-async
qpsmtpd-forkserver
qpsmtpd-prefork
README

View File

@ -36,7 +36,7 @@ WriteMakefile(
},
ABSTRACT => 'Flexible smtpd daemon written in Perl',
AUTHOR => 'Ask Bjoern Hansen <ask@develooper.com>',
EXE_FILES => [qw(qpsmtpd qpsmtpd-forkserver qpsmtpd-prefork qpsmtpd-async)],
EXE_FILES => [qw(qpsmtpd qpsmtpd-forkserver qpsmtpd-prefork)],
clean => { FILES => [ '*.bak' ], },
);

3
README
View File

@ -110,8 +110,7 @@ information about what's missing) to the mailinglist or a PR to github.
For better performance we recommend using "qpsmtpd-forkserver" or
running qpsmtpd under Apache 2.x. If you need extremely high
concurrency and all your plugins are compatible, you might want to try
the "qpsmtpd-async" model.
concurrency use http://haraka.github.io/
=head1 Plugins

View File

@ -94,8 +94,7 @@ information about what's missing) to the mailinglist or a PR to github.
For better performance we recommend using "qpsmtpd-forkserver" or
running qpsmtpd under Apache 2.x. If you need extremely high
concurrency and all your plugins are compatible, you might want to try
the "qpsmtpd-async" model.
concurrency use [Haraka](http://haraka.github.io/).
# Plugins

View File

@ -4,7 +4,7 @@
###
# Conventions:
# plugin names: F<myplugin>, F<qpsmtpd-async>
# plugin names: F<myplugin>
# constants: I<LOGDEBUG>
# smtp commands, answers: B<HELO>, B<250 Queued!>
#

View File

@ -4,7 +4,7 @@
###
# Conventions:
# plugin names: F<myplugin>, F<qpsmtpd-async>
# plugin names: F<myplugin>
# constants: I<LOGDEBUG>
# smtp commands, answers: B<HELO>, B<250 Queued!>
#
@ -42,8 +42,7 @@ connection to the worker process).
Useful for load-management and rereading large config files at some
frequency less than once per session.
This hook is available in the F<qpsmtpd-forkserver>, F<qpsmtpd-prefork> and
F<qpsmtpd-async> flavours.
This hook is available in F<qpsmtpd-forkserver> and F<qpsmtpd-prefork> flavors.
=cut
@ -56,8 +55,7 @@ methods which (I<may>) take some time, like DNS lookups. This will slow down
B<all> incoming connections, no other connection will be accepted while this
hook is running!
Arguments this hook receives are (B<NOTE>: currently no C<%args> for
F<qpsmtpd-async>):
Arguments this hook receives are:
my ($self,$transaction,%args) = @_;
# %args is:
@ -368,8 +366,6 @@ B<NOTE:> BE CAREFUL! If you drop the connection legal MTAs will retry again
and again, spammers will probably not. This is not RFC compliant and can lead
to an unpredictable mess. Use with caution.
B<NOTE:> This hook does not currently work in async mode.
Why this hook may be useful for you, see
L<http://www.nntp.perl.org/group/perl.qpsmtpd/2009/02/msg8502.html>, ff.
@ -899,20 +895,6 @@ Arguments are
my ($self,$transaction,@args) = @_;
=head2 hook_post_fork
B<NOTE:> This hook is only available in qpsmtpd-async.
It is called while starting qpsmtpd-async. You can run more than one
instance of qpsmtpd-async (one per CPU probably). This hook is called
after forking one instance.
Arguments:
my $self = shift;
The return values of this hook are discarded.
=head1 Authentication hooks
=cut

View File

@ -4,7 +4,7 @@
###
# Conventions:
# plugin names: F<myplugin>, F<qpsmtpd-async>
# plugin names: F<myplugin>
# constants: I<LOGDEBUG>
# smtp commands, answers: B<HELO>, B<250 Queued!>
#
@ -132,7 +132,7 @@ a good idea. This initialisation happens before any C<fork()> is done.
Therefore the file handle will be shared by all qpsmtpd processes and the
database will probably be confused if several different queries arrive on
the same file handle at the same time (and you may get the wrong answer, if
any). This is also true for F<qpsmtpd-async> and the pperl flavours, but
any). This is also true for the pperl flavor but
not for F<qpsmtpd> started by (x)inetd or tcpserver.
In short: don't do it if you want to write portable plugins.
@ -390,12 +390,6 @@ See note above about SMTP specs.
Finishing processing of the request. Usually used when the plugin sent the
response to the client.
=item YIELD
Only used in F<qpsmtpd-async>, see F<plugins/async/*>
=back
=cut
# vim: ts=2 sw=2 expandtab

View File

@ -4,7 +4,7 @@
###
# Conventions:
# plugin names: F<myplugin>, F<qpsmtpd-async>
# plugin names: F<myplugin>
# constants: I<LOGDEBUG>
# smtp commands, answers: B<HELO>, B<250 Queued!>
#

View File

@ -1,241 +0,0 @@
# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $
package Danga::Client;
use base 'Danga::TimeoutSocket';
use fields qw(
line
pause_count
read_bytes
data_bytes
callback
get_chunks
reader_object
);
use Time::HiRes ();
use bytes;
# 30 seconds max timeout!
sub max_idle_time { 30 }
sub max_connect_time { 1200 }
sub new {
my Danga::Client $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new(@_);
$self->reset_for_next_message;
return $self;
}
sub reset_for_next_message {
my Danga::Client $self = shift;
$self->{line} = '';
$self->{pause_count} = 0;
$self->{read_bytes} = 0;
$self->{callback} = undef;
$self->{reader_object} = undef;
$self->{data_bytes} = '';
$self->{get_chunks} = 0;
return $self;
}
sub get_bytes {
my Danga::Client $self = shift;
my ($bytes, $callback) = @_;
if ($self->{callback}) {
die "get_bytes/get_chunks currently in progress!";
}
$self->{read_bytes} = $bytes;
$self->{data_bytes} = $self->{line};
$self->{read_bytes} -= length($self->{data_bytes});
$self->{line} = '';
if ($self->{read_bytes} <= 0) {
if ($self->{read_bytes} < 0) {
$self->{line} = substr(
$self->{data_bytes},
$self->{read_bytes}, # negative offset
0 - $self->{read_bytes}, # to end of str
""
); # truncate that substr
}
$callback->($self->{data_bytes});
return;
}
$self->{callback} = $callback;
}
sub process_chunk {
my Danga::Client $self = shift;
my $callback = shift;
my $last_crlf = rindex($self->{line}, "\r\n");
if ($last_crlf != -1) {
if ($last_crlf + 2 == length($self->{line})) {
my $data = $self->{line};
$self->{line} = '';
$callback->($data);
}
else {
my $data = substr($self->{line}, 0, $last_crlf + 2);
$self->{line} = substr($self->{line}, $last_crlf + 2);
$callback->($data);
}
}
}
sub get_chunks {
my Danga::Client $self = shift;
my ($bytes, $callback) = @_;
if ($self->{callback}) {
die "get_bytes/get_chunks currently in progress!";
}
$self->{read_bytes} = $bytes;
$self->process_chunk($callback) if length($self->{line});
$self->{callback} = $callback;
$self->{get_chunks} = 1;
}
sub end_get_chunks {
my Danga::Client $self = shift;
my $remaining = shift;
$self->{callback} = undef;
$self->{get_chunks} = 0;
if (defined($remaining)) {
$self->process_read_buf(\$remaining);
}
}
sub set_reader_object {
my Danga::Client $self = shift;
$self->{reader_object} = shift;
}
sub event_read {
my Danga::Client $self = shift;
if (my $obj = $self->{reader_object}) {
$self->{reader_object} = undef;
$obj->event_read($self);
}
elsif ($self->{callback}) {
$self->{alive_time} = time;
if ($self->{get_chunks}) {
my $bref = $self->read($self->{read_bytes});
return $self->close($!) unless defined $bref;
$self->{line} .= $$bref;
$self->process_chunk($self->{callback}) if length($self->{line});
return;
}
if ($self->{read_bytes} > 0) {
my $bref = $self->read($self->{read_bytes});
return $self->close($!) unless defined $bref;
$self->{read_bytes} -= length($$bref);
$self->{data_bytes} .= $$bref;
}
if ($self->{read_bytes} <= 0) {
# print "Erk, read too much!\n" if $self->{read_bytes} < 0;
my $cb = $self->{callback};
$self->{callback} = undef;
$cb->($self->{data_bytes});
}
}
else {
my $bref = $self->read(8192);
return $self->close($!) unless defined $bref;
$self->process_read_buf($bref);
}
}
sub process_read_buf {
my Danga::Client $self = shift;
my $bref = shift;
$self->{line} .= $$bref;
return if $self->{pause_count} || $self->{closed};
if ($self->{line} =~ s/^(.*?\n)//) {
my $line = $1;
$self->{alive_time} = time;
my $resp = $self->process_line($line);
if ($::DEBUG > 1 and $resp) {
print "$$:" . ($self + 0) . "S: $_\n" for split(/\n/, $resp);
}
$self->write($resp) if $resp;
# $self->watch_read(0) if $self->{pause_count};
return if $self->{pause_count} || $self->{closed};
# read more in a timer, to give other clients a look in
$self->AddTimer(
0,
sub {
if (length($self->{line}) && !$self->paused) {
$self->process_read_buf(\"")
; # " for bad syntax highlighters
}
}
);
}
}
sub has_data {
my Danga::Client $self = shift;
return length($self->{line}) ? 1 : 0;
}
sub clear_data {
my Danga::Client $self = shift;
$self->{line} = '';
}
sub paused {
my Danga::Client $self = shift;
return 1 if $self->{pause_count};
return 1 if $self->{closed};
return 0;
}
sub pause_read {
my Danga::Client $self = shift;
$self->{pause_count}++;
# $self->watch_read(0);
}
sub continue_read {
my Danga::Client $self = shift;
$self->{pause_count}--;
if ($self->{pause_count} <= 0) {
$self->{pause_count} = 0;
$self->AddTimer(
0,
sub {
if (length($self->{line}) && !$self->paused) {
$self->process_read_buf(\"")
; # " for bad syntax highlighters
}
}
);
}
}
sub process_line {
my Danga::Client $self = shift;
return '';
}
sub close {
my Danga::Client $self = shift;
print "closing @_\n" if $::DEBUG;
$self->SUPER::close(@_);
}
sub event_err { my Danga::Client $self = shift; $self->close("Error") }
sub event_hup {
my Danga::Client $self = shift;
$self->close("Disconnect (HUP)");
}
1;

View File

@ -1,67 +0,0 @@
# $Id: TimeoutSocket.pm,v 1.2 2005/02/02 20:44:35 msergeant Exp $
package Danga::TimeoutSocket;
use base 'Danga::Socket';
use fields qw(alive_time create_time);
our $last_cleanup = 0;
Danga::Socket->AddTimer(15, \&_do_cleanup);
sub new {
my Danga::TimeoutSocket $self = shift;
my $sock = shift;
$self = fields::new($self) unless ref($self);
$self->SUPER::new($sock);
my $now = time;
$self->{alive_time} = $self->{create_time} = $now;
return $self;
}
# overload these in a subclass
sub max_idle_time { 0 }
sub max_connect_time { 0 }
sub Reset {
Danga::Socket->Reset;
Danga::Socket->AddTimer(15, \&_do_cleanup);
}
sub _do_cleanup {
my $now = time;
Danga::Socket->AddTimer(15, \&_do_cleanup);
my $sf = __PACKAGE__->get_sock_ref;
my %max_age; # classname -> max age (0 means forever)
my %max_connect; # classname -> max connect time
my @to_close;
while (my $k = each %$sf) {
my Danga::TimeoutSocket $v = $sf->{$k};
my $ref = ref $v;
next unless $v->isa('Danga::TimeoutSocket');
unless (defined $max_age{$ref}) {
$max_age{$ref} = $ref->max_idle_time || 0;
$max_connect{$ref} = $ref->max_connect_time || 0;
}
if (my $t = $max_connect{$ref}) {
if ($v->{create_time} < $now - $t) {
push @to_close, $v;
next;
}
}
if (my $t = $max_age{$ref}) {
if ($v->{alive_time} < $now - $t) {
push @to_close, $v;
}
}
}
$_->close("Timeout") foreach @to_close;
}
1;

View File

@ -277,9 +277,6 @@ sub run_hooks_no_respond {
warn("FATAL PLUGIN ERROR [" . $code->{name} . "]: ", $@);
next;
}
if ($r[0] == YIELD) {
die "YIELD not valid from $hook hook";
}
last unless $r[0] == DECLINED;
}
$r[0] = DECLINED if not defined $r[0];
@ -288,14 +285,10 @@ sub run_hooks_no_respond {
return (0, '');
}
sub continue_read { } # subclassed in -async
sub pause_read { die "Continuations only work in qpsmtpd-async" }
sub run_continuation {
my $self = shift;
die "No continuation in progress" unless $self->{_continuation};
$self->continue_read();
my $todo = $self->{_continuation};
$self->{_continuation} = undef;
my $hook = shift @$todo || die "No hook in the continuation";
@ -335,12 +328,7 @@ sub run_continuation {
if (!defined $cnotes || ref $cnotes eq "HASH");
}
if ($r[0] == YIELD) {
$self->pause_read();
$self->{_continuation} = [$hook, $args, @$todo];
return @r;
}
elsif ( $r[0] == DENY
if ( $r[0] == DENY
or $r[0] == DENYSOFT
or $r[0] == DENY_DISCONNECT
or $r[0] == DENYSOFT_DISCONNECT)

View File

@ -25,8 +25,6 @@ my %return_codes = (
DENYSOFT_DISCONNECT => 904, # 450 + disconnect
DECLINED => 909,
DONE => 910,
CONTINUATION => 911, # deprecated - use YIELD
YIELD => 911,
);
use vars qw(@ISA @EXPORT);

View File

@ -1,87 +0,0 @@
package Qpsmtpd::Plugin::Async::DNSBLBase;
# Class methods shared by the async plugins using DNS based blacklists or
# whitelists.
use strict;
use Qpsmtpd::Constants;
use ParaDNS;
sub lookup {
my ($class, $qp, $A_lookups, $TXT_lookups) = @_;
my $total_zones = @$A_lookups + @$TXT_lookups;
my ($A_pdns, $TXT_pdns);
if (@$A_lookups) {
$qp->log(LOGDEBUG, "Checking ",
join(", ", @$A_lookups),
" for A record in the background");
$A_pdns = ParaDNS->new(
callback => sub {
my ($result, $query) = @_;
return if $result !~ /^\d+\.\d+\.\d+\.\d+$/;
$qp->log(LOGDEBUG, "Result for A $query: $result");
$class->process_a_result($qp, $result, $query);
},
finished => sub {
$total_zones -= @$A_lookups;
$class->finished($qp, $total_zones);
},
hosts => [@$A_lookups],
type => 'A',
client => $qp->input_sock,
);
return unless defined $A_pdns;
}
if (@$TXT_lookups) {
$qp->log(LOGDEBUG, "Checking ",
join(", ", @$TXT_lookups),
" for TXT record in the background");
$TXT_pdns = ParaDNS->new(
callback => sub {
my ($result, $query) = @_;
return if $result !~ /[a-z]/;
$qp->log(LOGDEBUG, "Result for TXT $query: $result");
$class->process_txt_result($qp, $result, $query);
},
finished => sub {
$total_zones -= @$TXT_lookups;
$class->finished($qp, $total_zones);
},
hosts => [@$TXT_lookups],
type => 'TXT',
client => $qp->input_sock,
);
unless (defined $TXT_pdns) {
undef $A_pdns;
return;
}
}
return 1;
}
sub finished {
my ($class, $qp, $total_zones) = @_;
$qp->log(LOGDEBUG, "Finished ($total_zones)");
$qp->run_continuation unless $total_zones;
}
# plugins should implement the following two methods to do something
# useful with the results
sub process_a_result {
my ($class, $qp, $result, $query) = @_;
}
sub process_txt_result {
my ($class, $qp, $result, $query) = @_;
}
1;

View File

@ -1,367 +0,0 @@
package Qpsmtpd::PollServer;
use base ('Danga::Client', 'Qpsmtpd::SMTP');
# use fields required to be a subclass of Danga::Client. Have to include
# all fields used by Qpsmtpd.pm here too.
use fields qw(
input_sock
mode
header_lines
in_header
data_size
max_size
hooks
start_time
cmd_timeout
conn
_auth
_auth_mechanism
_auth_state
_auth_ticket
_auth_user
_commands
_config_cache
_connection
_continuation
_extras
_test_mode
_transaction
);
use Qpsmtpd::Constants;
use Qpsmtpd::Address;
use ParaDNS;
use Mail::Header;
use POSIX qw(strftime);
use Socket qw(inet_aton AF_INET CRLF);
use Time::HiRes qw(time);
use strict;
sub max_idle_time { 60 }
sub max_connect_time { 1200 }
sub input_sock {
my $self = shift;
@_ and $self->{input_sock} = shift;
$self->{input_sock} || $self;
}
sub new {
my Qpsmtpd::PollServer $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new(@_);
$self->{cmd_timeout} = 5;
$self->{start_time} = time;
$self->{mode} = 'connect';
$self->load_plugins;
$self->load_logging;
my ($rc, @msg) = $self->run_hooks_no_respond("pre-connection");
if ($rc == DENYSOFT || $rc == DENYSOFT_DISCONNECT) {
@msg = ("Sorry, try again later")
unless @msg;
$self->respond(451, @msg);
$self->disconnect;
}
elsif ($rc == DENY || $rc == DENY_DISCONNECT) {
@msg = ("Sorry, service not available for you")
unless @msg;
$self->respond(550, @msg);
$self->disconnect;
}
return $self;
}
sub uptime {
my Qpsmtpd::PollServer $self = shift;
return (time() - $self->{start_time});
}
sub reset_for_next_message {
my Qpsmtpd::PollServer $self = shift;
$self->SUPER::reset_for_next_message(@_);
$self->{_commands} = {
ehlo => 1,
helo => 1,
rset => 1,
mail => 1,
rcpt => 1,
data => 1,
help => 1,
vrfy => 1,
noop => 1,
quit => 1,
auth => 0, # disabled by default
};
$self->{mode} = 'cmd';
$self->{_extras} = {};
}
sub respond {
my Qpsmtpd::PollServer $self = shift;
my ($code, @messages) = @_;
while (my $msg = shift @messages) {
my $line = $code . (@messages ? "-" : " ") . $msg;
$self->write("$line\r\n");
}
return 1;
}
sub fault {
my Qpsmtpd::PollServer $self = shift;
$self->SUPER::fault(@_);
return;
}
my %cmd_cache;
sub process_line {
my Qpsmtpd::PollServer $self = shift;
my $line = shift || return;
if ($::DEBUG > 1) { print "$$:" . ($self + 0) . "C($self->{mode}): $line"; }
if ($self->{mode} eq 'cmd') {
$line =~ s/\r?\n$//s;
$self->connection->notes('original_string', $line);
my ($cmd, @params) = split(/ +/, $line, 2);
my $meth = lc($cmd);
if (my $lookup =
$cmd_cache{$meth}
|| $self->{_commands}->{$meth} && $self->can($meth))
{
$cmd_cache{$meth} = $lookup;
eval { $lookup->($self, @params); };
if ($@) {
my $error = $@;
chomp($error);
$self->log(LOGERROR, "Command Error: $error");
$self->fault("command '$cmd' failed unexpectedly");
}
}
else {
# No such method - i.e. unrecognized command
my ($rc, $msg) =
$self->run_hooks("unrecognized_command", $meth, @params);
}
}
elsif ($self->{mode} eq 'connect') {
$self->{mode} = 'cmd';
# I've removed an eval{} from around this. It shouldn't ever die()
# but if it does we're a bit screwed... Ah well :-)
$self->start_conversation;
}
else {
die "Unknown mode";
}
return;
}
sub disconnect {
my Qpsmtpd::PollServer $self = shift;
$self->SUPER::disconnect(@_);
$self->close;
}
sub close {
my Qpsmtpd::PollServer $self = shift;
$self->run_hooks_no_respond("post-connection");
$self->connection->reset;
$self->SUPER::close;
}
sub start_conversation {
my Qpsmtpd::PollServer $self = shift;
my $conn = $self->connection;
# set remote_host, remote_ip and remote_port
my ($ip, $port) = split(/:/, $self->peer_addr_string);
return $self->close() unless $ip;
$conn->remote_ip($ip);
$conn->remote_port($port);
$conn->remote_info("[$ip]");
my ($lip, $lport) = split(/:/, $self->local_addr_string);
$conn->local_ip($lip);
$conn->local_port($lport);
ParaDNS->new(
finished => sub { $self->continue_read(); $self->run_hooks("connect") },
# NB: Setting remote_info to the same as remote_host
callback => sub { $conn->remote_info($conn->remote_host($_[0])) },
host => $ip,
);
return;
}
sub data {
my Qpsmtpd::PollServer $self = shift;
my ($rc, $msg) = $self->run_hooks("data");
return 1;
}
sub data_respond {
my Qpsmtpd::PollServer $self = shift;
my ($rc, $msg) = @_;
if ($rc == DONE) {
return;
}
elsif ($rc == DENY) {
$msg->[0] ||= "Message denied";
$self->respond(554, @$msg);
$self->reset_transaction();
return;
}
elsif ($rc == DENYSOFT) {
$msg->[0] ||= "Message denied temporarily";
$self->respond(451, @$msg);
$self->reset_transaction();
return;
}
elsif ($rc == DENY_DISCONNECT) {
$msg->[0] ||= "Message denied";
$self->respond(554, @$msg);
$self->disconnect;
return;
}
elsif ($rc == DENYSOFT_DISCONNECT) {
$msg->[0] ||= "Message denied temporarily";
$self->respond(451, @$msg);
$self->disconnect;
return;
}
return $self->respond(503, "MAIL first") unless $self->transaction->sender;
return $self->respond(503, "RCPT first")
unless $self->transaction->recipients;
$self->{header_lines} = '';
$self->{data_size} = 0;
$self->{in_header} = 1;
$self->{max_size} = ($self->config('databytes'))[0] || 0;
$self->log(LOGDEBUG,
"max_size: $self->{max_size} / size: $self->{data_size}");
$self->respond(354, "go ahead");
my $max_get = $self->{max_size} || 1048576;
$self->get_chunks($max_get, sub { $self->got_data($_[0]) });
return 1;
}
sub got_data {
my Qpsmtpd::PollServer $self = shift;
my $data = shift;
my $done = 0;
my $remainder;
if ($data =~ s/^\.\r\n(.*)\z//ms) {
$remainder = $1;
$done = 1;
}
# add a transaction->blocked check back here when we have line by line plugin access...
unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) {
$data =~ s/\r\n/\n/mg;
$data =~ s/^\.\./\./mg;
if ($self->{in_header}) {
$self->{header_lines} .= $data;
if ($self->{header_lines} =~ s/\n(\n.*)\z/\n/ms) {
$data = $1;
# end of headers
$self->{in_header} = 0;
# ... need to check that we don't reformat any of the received lines.
#
# 3.8.2 Received Lines in Gatewaying
# When forwarding a message into or out of the Internet environment, a
# gateway MUST prepend a Received: line, but it MUST NOT alter in any
# way a Received: line that is already in the header.
my @header_lines = split(/^/m, $self->{header_lines});
my $header =
Mail::Header->new(
\@header_lines,
Modify => 0,
MailFrom => "COERCE"
);
$self->transaction->header($header);
$self->transaction->body_write($self->{header_lines});
$self->{header_lines} = '';
#$header->add("X-SMTPD", "qpsmtpd/".$self->version.", http://smtpd.develooper.com/");
# FIXME - call plugins to work on just the header here; can
# save us buffering the mail content.
# Save the start of just the body itself
$self->transaction->set_body_start();
}
}
$self->transaction->body_write(\$data);
$self->{data_size} += length $data;
}
if ($done) {
$self->end_of_data;
$self->end_get_chunks($remainder);
}
}
sub end_of_data {
my Qpsmtpd::PollServer $self = shift;
#$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300);
$self->log(LOGDEBUG,
"max_size: $self->{max_size} / size: $self->{data_size}");
my $header = $self->transaction->header;
if (!$header) {
$header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
$self->transaction->header($header);
}
my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP";
my $esmtp = substr($smtp, 0, 1) eq "E";
my $authheader;
my $sslheader;
if (defined $self->connection->notes('tls_enabled')
and $self->connection->notes('tls_enabled'))
{
$smtp .= "S" if $esmtp; # RFC3848
$sslheader = "("
. $self->connection->notes('tls_socket')->get_cipher()
. " encrypted) ";
}
if (defined $self->{_auth} and $self->{_auth} == OK) {
$smtp .= "A" if $esmtp; # RFC3848
$authheader =
"(smtp-auth username $self->{_auth_user}, mechanism $self->{_auth_mechanism})\n";
}
$header->add("Received",
$self->received_line($smtp, $authheader, $sslheader), 0);
return $self->respond(552, "Message too big!")
if $self->{max_size} and $self->{data_size} > $self->{max_size};
my ($rc, $msg) = $self->run_hooks("data_post");
return 1;
}
1;

View File

@ -731,8 +731,6 @@ sub data_respond {
$self->transaction->header($header);
# NOTE: This will not work properly under async. A
# data_headers_end_respond needs to be created.
my ($rc, $msg) = $self->run_hooks('data_headers_end');
if ($rc == DENY_DISCONNECT) {
$self->respond(554, $msg || "Message denied");
@ -871,10 +869,7 @@ sub received_line {
my $header_str;
my ($rc, @received) =
$self->run_hooks("received_line", $smtp, $authheader, $sslheader);
if ($rc == YIELD) {
die "YIELD not supported for received_line hook";
}
elsif ($rc == OK) {
if ($rc == OK) {
return join("\n", @received);
}
else { # assume $rc == DECLINED

View File

@ -6,7 +6,7 @@ Name: %{_package}
Version: %{_version}
Release: %{_release}
Summary: qpsmtpd + qpsmtpd-apache + qpsmtpd-async
Summary: qpsmtpd + qpsmtpd-apache
License: MIT
Group: System Environment/Daemons
URL: http://smtpd.develooper.com/
@ -38,10 +38,6 @@ Requires: perl(mod_perl2)
Summary: mod_perl-2 connection handler for qpsmtpd
Group: System Environment/Daemons
%package async
Summary: qpsmtpd using async I/O in a single process
Group: System Environment/Daemons
%package xinetd
Summary: xinetd support for qpsmtpd
Group: System Environment/Daemons
@ -52,11 +48,6 @@ Requires: xinetd
This module implements a mod_perl/apache 2.0 connection handler
that turns Apache into an SMTP server using Qpsmtpd.
%description async
This package contains the Qpsmtpd::PollServer module, which allows
qpsmtd to handle many connections in a single process and the
qpsmpd-async which uses it.
%description xinetd
This package contains the xinetd startup files for qpsmptd.
@ -126,16 +117,6 @@ if [ "$(cat %{name}-%{version}-%{release}-filelist)X" = "X" ] ; then
exit -1
fi
find ${RPM_BUILD_ROOT}%{_prefix} -type f -print | \
sed "s@^$RPM_BUILD_ROOT@@g" | \
grep -v packaging | \
grep -v README.selinux | \
grep -v /Apache | cat - %{name}-%{version}-%{release}-filelist | sort | uniq -u > %{name}-%{version}-%{release}-async-filelist
if [ "$(cat %{name}-%{version}-%{release}-async-filelist)X" = "X" ] ; then
echo "ERROR: EMPTY FILE LIST"
exit -1
fi
find ${RPM_BUILD_ROOT}%{_prefix} -type f -print | \
sed "s@^$RPM_BUILD_ROOT@@g" | \
grep -v [Aa]sync | \
@ -162,10 +143,6 @@ fi
%config(noreplace) %{_sysconfdir}/httpd/conf.d/*
%doc %{_docdir}/%{name}-apache-%{version}/README.selinux
%files async -f %{name}-%{version}-%{release}-async-filelist
%defattr(-,root,root)
%{_datadir}/%{name}/plugins/async/*
%files xinetd
%defattr(-,root,root)
%config(noreplace) %{_sysconfdir}/xinetd.d/smtp

View File

@ -1,88 +0,0 @@
#!perl -w
use Qpsmtpd::Plugin::Async::DNSBLBase;
sub init {
my $self = shift;
my $class = ref $self;
no strict 'refs';
push @{"${class}::ISA"}, 'Qpsmtpd::Plugin::Async::DNSBLBase';
}
sub hook_connect {
my ($self, $transaction) = @_;
my $class = ref $self;
my %whitelist_zones =
map { (split /\s+/, $_, 2)[0, 1] } $self->qp->config('whitelist_zones');
return DECLINED unless %whitelist_zones;
my $remote_ip = $self->connection->remote_ip;
my $reversed_ip = join(".", reverse(split(/\./, $remote_ip)));
# type TXT lookup only
return DECLINED
unless $class->lookup($self->qp, [],
[map { "$reversed_ip.$_" } keys %whitelist_zones],
);
return YIELD;
}
sub process_txt_result {
my ($class, $qp, $result, $query) = @_;
my $connection = $qp->connection;
$connection->notes('whitelisthost', $result)
unless $connection->notes('whitelisthost');
}
sub hook_rcpt {
my ($self, $transaction, $rcpt) = @_;
my $connection = $self->qp->connection;
if (my $note = $connection->notes('whitelisthost')) {
my $ip = $connection->remote_ip;
$self->log(LOGNOTICE, "Host $ip is whitelisted: $note");
}
return DECLINED;
}
=head1 NAME
dns_whitelist_soft - dns-based whitelist override for other qpsmtpd plugins
=head1 DESCRIPTION
The dns_whitelist_soft plugin allows selected host to be whitelisted as
exceptions to later plugin processing. It is most suitable for multisite
installations, so that the whitelist is stored in one location and available
from all.
=head1 CONFIGURATION
To enable the plugin, add it to the ~qpsmtpd/config/plugins file as usual.
It should precede any plugins whose rejections you wish to override. You may
have to alter those plugins to check the appropriate notes field.
Several configuration files are supported, corresponding to different
parts of the SMTP conversation:
=over 4
=item whitelist_zones
Any IP address listed in the whitelist_zones file is queried using
the connecting MTA's IP address. Any A or TXT answer means that the
remote HOST address can be selectively exempted at other stages by plugins
testing for a 'whitelisthost' connection note.
=back
NOTE: in contrast to the non-async version, the other 'connect' hooks
fired after the 'connect' hook of this plugin will see the 'whitelisthost'
connection note, if set by this plugin.
=cut

View File

@ -1,202 +0,0 @@
#!perl -w
use Qpsmtpd::Plugin::Async::DNSBLBase;
sub init {
my ($self, $qp, $denial) = @_;
my $class = ref $self;
{
no strict 'refs';
push @{"${class}::ISA"}, 'Qpsmtpd::Plugin::Async::DNSBLBase';
}
if (defined $denial and $denial =~ /^disconnect$/i) {
$self->{_dnsbl}->{DENY} = DENY_DISCONNECT;
}
else {
$self->{_dnsbl}->{DENY} = DENY;
}
}
sub hook_connect {
my ($self, $transaction) = @_;
my $class = ref $self;
my $remote_ip = $self->connection->remote_ip;
my $allow =
grep { s/\.?$/./; $_ eq substr($remote_ip . '.', 0, length $_) }
$self->qp->config('dnsbl_allow');
return DECLINED if $allow;
my %dnsbl_zones =
map { (split /:/, $_, 2)[0, 1] } $self->qp->config('dnsbl_zones');
return DECLINED unless %dnsbl_zones;
my $reversed_ip = join(".", reverse(split(/\./, $remote_ip)));
my @A_zones = grep { defined($dnsbl_zones{$_}) } keys %dnsbl_zones;
my @TXT_zones = grep { !defined($dnsbl_zones{$_}) } keys %dnsbl_zones;
if (@A_zones) {
# message templates for responding to the client
$self->connection->notes(
dnsbl_templates => {
map {
+"$reversed_ip.$_" => $dnsbl_zones{$_}
} @A_zones
}
);
}
return DECLINED
unless $class->lookup($self->qp,
[map { "$reversed_ip.$_" } @A_zones],
[map { "$reversed_ip.$_" } @TXT_zones],
);
return YIELD;
}
sub process_a_result {
my ($class, $qp, $result, $query) = @_;
my $conn = $qp->connection;
return if $class->connection->notes('dnsbl');
my $templates = $class->connection->notes('dnsbl_templates');
my $ip = $conn->remote_ip;
my $template = $templates->{$query};
$template =~ s/%IP%/$ip/g;
$class->connection->notes('dnsbl', $template);
}
sub process_txt_result {
my ($class, $qp, $result, $query) = @_;
my $conn = $class->connection;
$conn->notes('dnsbl', $result) unless $conn->notes('dnsbl');
}
sub hook_rcpt {
my ($self, $transaction, $rcpt) = @_;
my $connection = $self->qp->connection;
# RBLSMTPD being non-empty means it contains the failure message to return
if (defined($ENV{'RBLSMTPD'}) && $ENV{'RBLSMTPD'} ne '') {
my $result = $ENV{'RBLSMTPD'};
my $remote_ip = $self->connection->remote_ip;
$result =~ s/%IP%/$remote_ip/g;
return (DENY, join(" ", $self->qp->config('dnsbl_rejectmsg'), $result));
}
my $note = $self->connection->notes('dnsbl');
return (DENY, $note) if $note;
return DECLINED;
}
=head1 NAME
dnsbl - handle DNS BlackList lookups
=head1 DESCRIPTION
Plugin that checks the IP address of the incoming connection against
a configurable set of RBL services.
=head1 Configuration files
This plugin uses the following configuration files. All of these are optional.
However, not specifying dnsbl_zones is like not using the plugin at all.
=over 4
=item dnsbl_zones
Normal ip based dns blocking lists ("RBLs") which contain TXT records are
specified simply as:
relays.ordb.org
spamsources.fabel.dk
To configure RBL services which do not contain TXT records in the DNS,
but only A records (e.g. the RBL+ at http://www.mail-abuse.org), specify your
own error message to return in the SMTP conversation after a colon e.g.
rbl-plus.mail-abuse.org:You are listed at - http://http://www.mail-abuse.org/cgi-bin/lookup?%IP%
The string %IP% will be replaced with the IP address of incoming connection.
Thus a fully specified file could be:
sbl-xbl.spamhaus.org
list.dsbl.org
rbl-plus.mail-abuse.ja.net:Listed by rbl-plus.mail-abuse.ja.net - see <URL:http://www.mail-abuse.org/cgi-bin/lookup?%IP%>
relays.ordb.org
=item dnsbl_allow
List of allowed ip addresses that bypass RBL checking. Format is one entry per line,
with either a full IP address or a truncated IP address with a period at the end.
For example:
192.168.1.1
172.16.33.
NB the environment variable RBLSMTPD is considered before this file is
referenced. See below.
=item dnsbl_rejectmsg
A textual message that is sent to the sender on an RBL failure. The TXT record
from the RBL list is also sent, but this file can be used to indicate what
action the sender should take.
For example:
If you think you have been blocked in error, then please forward
this entire error message to your ISP so that they can fix their problems.
The next line often contains a URL that can be visited for more information.
=back
=head1 Environment Variables
=head2 RBLSMTPD
The environment variable RBLSMTPD is supported and mimics the behaviour of
Dan Bernstein's rblsmtpd. The exception to this is the '-' char at the
start of RBLSMTPD which is used to force a hard error in Dan's rblsmtpd.
NB I don't really see the benefit
of using a soft error for a site in an RBL list. This just complicates
things as it takes 7 days (or whatever default period) before a user
gets an error email back. In the meantime they are complaining that their
emails are being "lost" :(
=over 4
=item RBLSMTPD is set and non-empty
The contents are used as the SMTP conversation error.
Use this for forcibly blocking sites you don't like
=item RBLSMTPD is set, but empty
In this case no RBL checks are made.
This can be used for local addresses.
=item RBLSMTPD is not set
All RBL checks will be made.
This is the setting for remote sites that you want to check against RBL.
=back
=head1 Revisions
See: http://cvs.perl.org/viewcvs/qpsmtpd/plugins/dnsbl
=cut

View File

@ -1,141 +0,0 @@
#!perl -w
=head1 NAME
earlytalker - Check that the client doesn't talk before we send the SMTP banner
=head1 DESCRIPTION
Checks to see if the remote host starts talking before we've issued a 2xx
greeting. If so, we're likely looking at a direct-to-MX spam agent which
pipelines its entire SMTP conversation, and will happily dump an entire spam
into our mail log even if later tests deny acceptance.
Depending on configuration, clients which behave in this way are either
immediately disconnected with a deny or denysoft code, or else are issued this
on all mail/rcpt commands in the transaction.
=head1 CONFIGURATION
=over 4
=item wait [integer]
The number of seconds to delay the initial greeting to see if the connecting
host speaks first. The default is 1. Do not select a value that is too high,
or you may be unable to receive mail from MTAs with short SMTP connect or
greeting timeouts -- these are known to range as low as 30 seconds, and may
in some cases be configured lower by mailserver admins. Network transit time
must also be allowed for.
=item action [string: deny, denysoft, log]
What to do when matching an early-talker -- the options are I<deny>,
I<denysoft> or I<log>.
If I<log> is specified, the connection will be allowed to proceed as normal,
and only a warning will be logged.
The default is I<denysoft>.
=item defer-reject [boolean]
When an early-talker is detected, if this option is set to a true value, the
SMTP greeting will be issued as usual, but all RCPT/MAIL commands will be
issued a deny or denysoft (depending on the value of I<action>). The default
is to react at the SMTP greeting stage by issuing the apropriate response code
and terminating the SMTP connection.
=item check-at [string: connect, data]
Defines when to check for early talkers, either at connect time (pre-greet pause)
or at DATA time (pause before sending "354 go ahead").
The default is I<connect>.
Note that defer-reject has no meaning if check-at is I<data>.
=back
=cut
my $MSG = 'Connecting host started transmitting before SMTP greeting';
sub register {
my ($self, $qp, @args) = @_;
if (@args % 2) {
$self->log(LOGERROR, "Unrecognized/mismatched arguments");
return undef;
}
$self->{_args} = {
'wait' => 1,
'action' => 'denysoft',
'defer-reject' => 0,
'check-at' => 'connect',
@args,
};
print STDERR "Check at: ", $self->{_args}{'check-at'}, "\n";
$self->register_hook($self->{_args}->{'check-at'}, 'check_talker_poll');
$self->register_hook($self->{_args}->{'check-at'}, 'check_talker_post');
if ($self->{_args}{'check-at'} eq 'connect') {
$self->register_hook('mail', 'hook_mail')
if $self->{_args}->{'defer-reject'};
}
1;
}
sub check_talker_poll {
my ($self, $transaction) = @_;
my $qp = $self->qp;
my $conn = $qp->connection;
my $check_until = time + $self->{_args}{'wait'};
$qp->AddTimer(
1,
sub {
read_now($qp, $conn, $check_until, $self->{_args}{'check-at'});
}
);
return YIELD;
}
sub read_now {
my ($qp, $conn, $until, $phase) = @_;
if ($qp->has_data) {
$qp->log(LOGNOTICE,
'remote host started talking after $phase before we responded');
$qp->clear_data if $phase eq 'data';
$conn->notes('earlytalker', 1);
$qp->run_continuation;
}
elsif (time >= $until) {
# no early talking
$qp->run_continuation;
}
else {
$qp->AddTimer(1, sub { read_now($qp, $conn, $until, $phase) });
}
}
sub check_talker_post {
my ($self, $transaction) = @_;
return DECLINED unless $self->connection->notes('earlytalker');
return DECLINED if $self->{'defer-reject'};
return (DENY, $MSG) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT, $MSG) if $self->{_args}->{'action'} eq 'denysoft';
return DECLINED; # assume action eq 'log'
}
sub hook_mail {
my ($self, $transaction) = @_;
return DECLINED unless $self->connection->notes('earlytalker');
return (DENY, $MSG) if $self->{_args}->{'action'} eq 'deny';
return (DENYSOFT, $MSG) if $self->{_args}->{'action'} eq 'denysoft';
return DECLINED;
}

View File

@ -1,413 +0,0 @@
#!perl -w
=head1 NAME
smtp-forward
=head1 DESCRIPTION
This plugin forwards the mail via SMTP to a specified server, rather than
delivering the email locally.
=head1 CONFIG
It takes one required parameter, the IP address or hostname to forward to.
async/queue/smtp-forward 10.2.2.2
Optionally you can also add a port:
async/queue/smtp-forward 10.2.2.2 9025
=cut
use Qpsmtpd::Constants;
sub register {
my ($self, $qp) = @_;
$self->register_hook(queue => "start_queue");
$self->register_hook(queue => "finish_queue");
}
sub init {
my ($self, $qp, @args) = @_;
if (@args > 0) {
if ($args[0] =~ /^([\.\w_-]+)$/) {
$self->{_smtp_server} = $1;
}
else {
die "Bad data in smtp server: $args[0]";
}
$self->{_smtp_port} = 25;
if (@args > 1 and $args[1] =~ /^(\d+)$/) {
$self->{_smtp_port} = $1;
}
$self->log(LOGWARN, "WARNING: Ignoring additional arguments.")
if (@args > 2);
}
else {
die("No SMTP server specified in smtp-forward config");
}
}
sub start_queue {
my ($self, $transaction) = @_;
my $qp = $self->qp;
my $SERVER = $self->{_smtp_server};
my $PORT = $self->{_smtp_port};
$self->log(LOGINFO, "forwarding to $SERVER:$PORT");
$transaction->notes(
'async_sender',
AsyncSMTPSender->new(
$SERVER, $PORT, $qp, $self, $transaction
)
);
return YIELD;
}
sub finish_queue {
my ($self, $transaction) = @_;
my $sender = $transaction->notes('async_sender');
$transaction->notes('async_sender', undef);
my ($rc, $msg) = $sender->results;
return $rc, $msg;
}
package AsyncSMTPSender;
use IO::Socket;
use base qw(Danga::Socket);
use fields qw(
qp
pkg
tran
state
rcode
rmsg
buf
command
resp
to
);
use constant ST_CONNECTING => 0;
use constant ST_CONNECTED => 1;
use constant ST_COMMANDS => 2;
use constant ST_DATA => 3;
use Qpsmtpd::Constants;
sub new {
my ($self, $server, $port, $qp, $pkg, $transaction) = @_;
$self = fields::new($self) unless ref $self;
my $sock = IO::Socket::INET->new(
PeerAddr => $server,
PeerPort => $port,
Blocking => 0,
)
or die "Error connecting to server $server:$port : $!\n";
IO::Handle::blocking($sock, 0);
binmode($sock, ':raw');
$self->{qp} = $qp;
$self->{pkg} = $pkg;
$self->{tran} = $transaction;
$self->{state} = ST_CONNECTING;
$self->{rcode} = DECLINED;
$self->{command} = 'connect';
$self->{buf} = '';
$self->{resp} = [];
# copy the recipients so we can pop them off one by one
$self->{to} = [$transaction->recipients];
$self->SUPER::new($sock);
# Watch for write first, this is when the TCP session is established.
$self->watch_write(1);
return $self;
}
sub results {
my AsyncSMTPSender $self = shift;
return ($self->{rcode}, $self->{rmsg});
}
sub log {
my AsyncSMTPSender $self = shift;
$self->{qp}->log(@_);
}
sub cont {
my AsyncSMTPSender $self = shift;
$self->{qp}->run_continuation;
}
sub command {
my AsyncSMTPSender $self = shift;
my ($command, $params) = @_;
$params ||= '';
$self->log(LOGDEBUG, ">> $command $params");
$self->write( ($command =~ m/ / ? "$command:" : $command)
. ($params ? " $params" : "")
. "\r\n");
$self->watch_read(1);
$self->{command} = ($command =~ /(\S+)/)[0];
}
sub handle_response {
my AsyncSMTPSender $self = shift;
my $method = "cmd_" . lc($self->{command});
$self->$method(@_);
}
sub cmd_connect {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 220) {
$self->{rmsg} = "Error on connect: @$response";
$self->close;
$self->cont;
}
else {
my $host = $self->{qp}->config('me');
print "HELOing with $host\n";
$self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO",
$host);
}
}
sub cmd_helo {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on HELO: @$response";
$self->close;
$self->cont;
}
else {
$self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
}
}
sub cmd_ehlo {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on EHLO: @$response";
$self->close;
$self->cont;
}
else {
$self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
}
}
sub cmd_mail {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on MAIL FROM: @$response";
$self->close;
$self->cont;
}
else {
$self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
}
}
sub cmd_rcpt {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on RCPT TO: @$response";
$self->close;
$self->cont;
}
else {
if (@{$self->{to}}) {
$self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
}
else {
$self->command("DATA");
}
}
}
sub cmd_data {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 354) {
$self->{rmsg} = "Error on DATA: @$response";
$self->close;
$self->cont;
}
else {
# $self->{state} = ST_DATA;
$self->datasend($self->{tran}->header->as_string);
$self->{tran}->body_resetpos;
my $write_buf = '';
while (my $line = $self->{tran}->body_getline) {
$line =~ s/\r?\n/\r\n/;
$write_buf .= $line;
if (length($write_buf) >= 131072) { # 128KB, arbitrary value
$self->log(LOGDEBUG, ">> $write_buf");
$self->datasend($write_buf);
$write_buf = '';
}
}
if (length($write_buf)) {
$self->log(LOGDEBUG, ">> $write_buf");
$self->datasend($write_buf);
}
$self->write(".\r\n");
$self->{command} = "DATAEND";
}
}
sub cmd_dataend {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error after DATA: @$response";
$self->close;
$self->cont;
}
else {
$self->command("QUIT");
}
}
sub cmd_quit {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
$self->{rcode} = OK;
$self->{rmsg} = "Queued!";
$self->close;
$self->cont;
}
sub datasend {
my AsyncSMTPSender $self = shift;
my ($data) = @_;
$data =~ s/^\./../mg;
$self->write(\$data);
}
sub event_read {
my AsyncSMTPSender $self = shift;
if ($self->{state} == ST_CONNECTED) {
$self->{state} = ST_COMMANDS;
}
if ($self->{state} == ST_COMMANDS) {
my $in = $self->read(1024);
if (!$in) {
# XXX: connection closed
$self->close("lost connection");
return;
}
my @lines = split /\r?\n/, $self->{buf} . $$in, -1;
$self->{buf} = delete $lines[-1];
for (@lines) {
if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) {
$self->log(LOGDEBUG, "<< $code$cont$rest");
push @{$self->{resp}}, $rest;
if ($cont eq ' ') {
$self->handle_response($code, $self->{resp});
$self->{resp} = [];
}
}
else {
$self->log(LOGERROR, "Unrecognised SMTP response line: $_");
$self->{rmsg} = "Error from upstream SMTP server";
$self->close;
$self->cont;
}
}
}
else {
$self->log(LOGERROR, "SMTP Session occurred out of order");
$self->close;
$self->cont;
}
}
sub event_write {
my AsyncSMTPSender $self = shift;
if ($self->{state} == ST_CONNECTING) {
$self->watch_write(0);
$self->{state} = ST_CONNECTED;
$self->watch_read(1);
}
elsif (0 && $self->{state} == ST_DATA) {
# send more data
if (my $line = $self->{tran}->body_getline) {
$self->log(LOGDEBUG, ">> $line");
$line =~ s/\r?\n/\r\n/;
$self->datasend($line);
}
else {
# no more data.
$self->log(LOGINFO, "No more data");
$self->watch_write(0);
$self->{state} = ST_COMMANDS;
}
}
else {
$self->write(undef);
}
}
sub event_err {
my ($self) = @_;
eval { $self->read(1); }; # gives us the correct error in errno
$self->{rmsg} = "Read error from remote server: $!";
#print "lost connection: $!\n";
$self->close;
$self->cont;
}
sub event_hup {
my ($self) = @_;
eval { $self->read(1); }; # gives us the correct error in errno
$self->{rmsg} = "HUP error from remote server: $!";
#print "lost connection: $!\n";
$self->close;
$self->cont;
}

View File

@ -1,206 +0,0 @@
#!perl -w
use strict;
use warnings;
use Qpsmtpd::Constants;
use Qpsmtpd::DSN;
use Qpsmtpd::TcpServer;
#use ParaDNS; # moved into register
use Socket;
my %invalid = ();
my $has_ipv6 = Qpsmtpd::TcpServer::has_ipv6();
sub register {
my ($self, $qp) = @_;
foreach my $i ($self->qp->config("invalid_resolvable_fromhost")) {
$i =~ s/^\s*//;
$i =~ s/\s*$//;
if ($i =~ m#^((\d{1,3}\.){3}\d{1,3})/(\d\d?)#) {
$invalid{$1} = $3;
}
}
eval 'use ParaDNS';
if ($@) {
warn "could not load ParaDNS, plugin disabled";
return DECLINED;
}
$self->register_hook(mail => 'hook_mail_start');
$self->register_hook(mail => 'hook_mail_done');
}
sub hook_mail_start {
my ($self, $transaction, $sender) = @_;
return DECLINED
if ($self->connection->notes('whitelisthost'));
if ($sender ne '<>') {
unless ($sender->host) {
# default of addr_bad_from_system is DENY, we use DENYSOFT here to
# get the same behaviour as without Qpsmtpd::DSN...
return
Qpsmtpd::DSN->addr_bad_from_system(DENYSOFT,
"FQDN required in the envelope sender");
}
return DECLINED if $sender->host =~ m/^\[(\d{1,3}\.){3}\d{1,3}\]$/;
unless ($self->check_dns($sender->host)) {
return Qpsmtpd::DSN->temp_resolver_failed(
"Could not resolve " . $sender->host);
}
return YIELD;
}
return DECLINED;
}
sub hook_mail_done {
my ($self, $transaction, $sender) = @_;
return DECLINED
if ($self->connection->notes('whitelisthost'));
if ($sender ne "<>" && !$transaction->notes('resolvable_fromhost')) {
# default of temp_resolver_failed is DENYSOFT
return Qpsmtpd::DSN->temp_resolver_failed(
"Could not resolve " . $sender->host);
}
return DECLINED;
}
sub check_dns {
my ($self, $host) = @_;
my @host_answers;
my $qp = $self->qp;
$qp->input_sock->pause_read;
my $a_records = [];
my $num_queries = 1; # queries in progress
my $mx_found = 0;
ParaDNS->new(
callback => sub {
my $mx = shift;
return if $mx =~ /^[A-Z]+$/; # error
my $addr = $mx->[0];
$mx_found = 1;
$num_queries++;
ParaDNS->new(
callback => sub {
push @$a_records, $_[0] if $_[0] !~ /^[A-Z]+$/;
},
finished => sub {
$num_queries--;
$self->finish_up($qp, $a_records, $num_queries);
},
host => $addr,
type => 'A',
);
if ($has_ipv6) {
$num_queries++;
ParaDNS->new(
callback => sub {
push @$a_records, $_[0] if $_[0] !~ /^[A-Z]+$/;
},
finished => sub {
$num_queries--;
$self->finish_up($qp, $a_records, $num_queries);
},
host => $addr,
type => 'AAAA',
);
}
},
finished => sub {
unless ($mx_found) {
$num_queries++;
ParaDNS->new(
callback => sub {
push @$a_records, $_[0] if $_[0] !~ /^[A-Z]+$/;
},
finished => sub {
$num_queries--;
$self->finish_up($qp, $a_records, $num_queries);
},
host => $host,
type => 'A',
);
if ($has_ipv6) {
$num_queries++;
ParaDNS->new(
callback => sub {
push @$a_records, $_[0] if $_[0] !~ /^[A-Z]+$/;
},
finished => sub {
$num_queries--;
$self->finish_up($qp, $a_records, $num_queries);
},
host => $host,
type => 'AAAA',
);
}
}
$num_queries--;
$self->finish_up($qp, $a_records, $num_queries);
},
host => $host,
type => 'MX',
)
or $qp->input_sock->continue_read, return;
return 1;
}
sub finish_up {
my ($self, $qp, $a_records, $num_queries) = @_;
return if defined $qp->transaction->notes('resolvable_fromhost');
foreach my $addr (@$a_records) {
if (is_valid($addr)) {
$qp->transaction->notes('resolvable_fromhost', 1);
$qp->input_sock->continue_read;
$qp->run_continuation;
return;
}
}
unless ($num_queries) {
# all queries returned no valid response
$qp->transaction->notes('resolvable_fromhost', 0);
$qp->input_sock->continue_read;
$qp->run_continuation;
}
}
sub is_valid {
my $ip = shift;
my ($net, $mask);
foreach $net (keys %invalid) {
$mask = $invalid{$net};
$mask = pack "B32", "1" x ($mask) . "0" x (32 - $mask);
return 0
if join(".", unpack("C4", inet_aton($ip) & $mask)) eq $net;
}
return 1;
}

View File

@ -1,92 +0,0 @@
#!perl -w
use Qpsmtpd::Plugin::Async::DNSBLBase;
sub init {
my $self = shift;
my $class = ref $self;
no strict 'refs';
push @{"${class}::ISA"}, 'Qpsmtpd::Plugin::Async::DNSBLBase';
}
sub hook_mail {
my ($self, $transaction, $sender) = @_;
my $class = ref $self;
return DECLINED if $sender->format eq '<>';
my %rhsbl_zones =
map { (split /\s+/, $_, 2)[0, 1] } $self->qp->config('rhsbl_zones');
return DECLINED unless %rhsbl_zones;
my $sender_host = $sender->host;
my @A_zones = grep { defined($rhsbl_zones{$_}) } keys %rhsbl_zones;
my @TXT_zones = grep { !defined($rhsbl_zones{$_}) } keys %rhsbl_zones;
if (@A_zones) {
# message templates for responding to the client
$transaction->notes(rhsbl_templates =>
{map { +"$sender_host.$_" => $rhsbl_zones{$_} } @A_zones});
}
return DECLINED
unless $class->lookup($self->qp,
[map { "$sender_host.$_" } @A_zones],
[map { "$sender_host.$_" } @TXT_zones],
);
return YIELD;
}
sub process_a_result {
my ($class, $qp, $result, $query) = @_;
my $transaction = $qp->transaction;
$transaction->notes('rhsbl',
$transaction->notes('rhsbl_templates')->{$query})
unless $transaction->notes('rhsbl');
}
sub process_txt_result {
my ($class, $qp, $result, $query) = @_;
my $transaction = $qp->transaction;
$transaction->notes('rhsbl', $result) unless $transaction->notes('rhsbl');
}
sub hook_rcpt {
my ($self, $transaction, $rcpt) = @_;
my $host = $transaction->sender->host;
my $note = $transaction->notes('rhsbl');
return (DENY, "Mail from $host rejected because it $note") if $note;
return DECLINED;
}
=head1 NAME
rhsbl - handle RHSBL lookups
=head1 DESCRIPTION
Pluging that checks the host part of the sender's address against a
configurable set of RBL services.
=head1 CONFIGURATION
This plugin reads the lists to use from the rhsbl_zones configuration
file. Normal domain based dns blocking lists ("RBLs") which contain TXT
records are specified simply as:
dsn.rfc-ignorant.org
To configure RBL services which do not contain TXT records in the DNS,
but only A records, specify, after a whitespace, your own error message
to return in the SMTP conversation e.g.
abuse.rfc-ignorant.org does not support abuse@domain
=cut

View File

@ -1,151 +0,0 @@
#!perl -w
use Qpsmtpd::Plugin::Async::DNSBLBase;
use strict;
use warnings;
sub init {
my ($self, $qp, %args) = @_;
my $class = ref $self;
$self->isa_plugin("uribl");
{
no strict 'refs';
push @{"${class}::ISA"}, 'Qpsmtpd::Plugin::Async::DNSBLBase';
}
$self->SUPER::init($qp, %args);
}
sub register {
my $self = shift;
$self->register_hook('data_post', 'start_data_post');
$self->register_hook('data_post', 'finish_data_post');
}
sub start_data_post {
my ($self, $transaction) = @_;
my $class = ref $self;
my @names;
my $queries = $self->lookup_start(
$transaction,
sub {
my ($self, $name) = @_;
push @names, $name;
}
);
my @hosts;
foreach my $z (keys %{$self->{uribl_zones}}) {
push @hosts, map { "$_.$z" } @names;
}
$transaction->notes(uribl_results => {});
$transaction->notes(uribl_zones => $self->{uribl_zones});
return DECLINED
unless @hosts && $class->lookup($self->qp, [@hosts], [@hosts]);
return YIELD;
}
sub finish_data_post {
my ($self, $transaction) = @_;
my $matches = $self->collect_results($transaction);
for (@$matches) {
$self->log(LOGWARN, $_->{desc});
if ($_->{action} eq 'add-header') {
$transaction->header->add('X-URIBL-Match', $_->{desc});
}
elsif ($_->{action} eq 'deny') {
return (DENY, $_->{desc});
}
elsif ($_->{action} eq 'denysoft') {
return (DENYSOFT, $_->{desc});
}
}
return DECLINED;
}
sub init_resolver { }
sub process_a_result {
my ($class, $qp, $result, $query) = @_;
my $transaction = $qp->transaction;
my $results = $transaction->notes('uribl_results');
my $zones = $transaction->notes('uribl_zones');
foreach my $z (keys %$zones) {
if ($query =~ /^(.*)\.$z$/) {
my $name = $1;
$results->{$z}->{$name}->{a} = $result;
}
}
}
sub process_txt_result {
my ($class, $qp, $result, $query) = @_;
my $transaction = $qp->transaction;
my $results = $transaction->notes('uribl_results');
my $zones = $transaction->notes('uribl_zones');
foreach my $z (keys %$zones) {
if ($query =~ /^(.*)\.$z$/) {
my $name = $1;
$results->{$z}->{$name}->{txt} = $result;
}
}
}
sub collect_results {
my ($self, $transaction) = @_;
my $results = $transaction->notes('uribl_results');
my @matches;
foreach my $z (keys %$results) {
foreach my $n (keys %{$results->{$z}}) {
if (exists $results->{$z}->{$n}->{a}) {
if ($self->evaluate($z, $results->{$z}->{$n}->{a})) {
$self->log(LOGDEBUG, "match $n in $z");
push @matches,
{
action => $self->{uribl_zones}->{$z}->{action},
desc => "$n in $z: "
. (
$results->{$z}->{$n}->{txt}
|| $results->{$z}->{$n}->{a}
),
};
}
}
}
}
return \@matches;
}
=head1 NAME
uribl - URIBL blocking plugin for qpsmtpd
=head1 DESCRIPTION
This plugin implements DNSBL lookups for URIs found in spam, such as that
implemented by SURBL (see E<lt>http://surbl.org/E<gt>). Incoming messages are
scanned for URIs, which are then checked against one or more URIBLs in a
fashion similar to DNSBL systems.
=head1 CONFIGURATION
See the documentation of the non-async version. The timeout config option is
ignored, the ParaDNS timeout is used instead.
=cut

View File

@ -36,10 +36,6 @@ NOTE: other 'connect' hooks will continue to fire (e.g. dnsbl), since the DNS
queries happen in the background. This plugin's 'rcpt_handler' retrieves
the results of the query and sets the connection note if found.
If you switch to qpsmtpd-async and to the async version of this plugin, then
the 'whitelisthost' connection note will be available to the other 'connect'
hooks, see the documentation of the async plugin.
=head1 AUTHOR
John Peacock <jpeacock@rowman.com>

View File

@ -114,7 +114,7 @@ p0f v3 requires only the remote IP.
p0f v2 requires four pieces of information to look up the p0f fingerprint:
local_ip, local_port, remote_ip, and remote_port. TcpServer.pm has been
has been updated to provide that information when running under djb's
tcpserver. The async, forkserver, and prefork models will likely require
tcpserver. The forkserver and prefork models will likely require
some additional changes to make sure these fields are populated.
=head1 ACKNOWLEDGEMENTS

View File

@ -36,8 +36,6 @@ Since many connections are from blacklisted IPs, naughty significantly reduces t
Rather than having plugins split processing across hooks, plugins can run to completion when they have the information they need, issue a I<reject naughty> if warranted, and be done.
This may help reduce the code divergence between the sync and async deployment models.
=head2 authentication
When a user authenticates, the naughty flag on their connection is cleared. This allows users to send email from IPs that fail connection tests such as B<dnsbl>. Note that if I<reject connect> is set, connections will not get the chance to authenticate. To allow clients a chance to authenticate, I<reject mail> works well.

View File

@ -68,19 +68,20 @@ Names of the substitution parameters and the replaced charachters are the same
L<spamd(8)> supports, for more info see the C<--virtual-config-dir>
option of L<spamd(8)>.
When called with more than one parameter, this plugin is probably not usable
with qpsmtpd-async.
With the the second parameter being C<%d> it will still deliver one message
for each recipient: With the two recpients C<user@example.org> and
C<user2@example.org> you get two messages in the C<example.org/> directory.
=cut
use strict;
use File::Path qw(mkpath);
use Sys::Hostname qw(hostname);
use Time::HiRes qw(gettimeofday);
use Qpsmtpd::Constants;
sub register {
my ($self, $qp, @args) = @_;
@ -108,8 +109,7 @@ sub register {
$self->{_perms} = oct($self->{_perms});
}
$self->{_perms} = 0700
unless $self->{_perms};
$self->{_perms} ||= 0700;
unless ($self->{_maildir}) {
$self->log(LOGWARN, "WARNING: maildir directory not specified");

View File

@ -64,18 +64,6 @@ sub hook_unrecognized_command {
$self->log(LOGINFO, "stunnel : $remote_ip:$remote_port");
# DNS reverse
if ( $self->isa('Qpsmtpd::PollServer') ) {
eval {
use ParaDNS;
ParaDNS->new(
finished => sub { $self->continue_read() },
callback => sub { $self->connection->remote_host($_[0]) },
host => $remote_ip,
);
};
}
else {
my $res = Net::DNS::Resolver->new( dnsrch => 0 );
$res->tcp_timeout(3);
$res->udp_timeout(3);
@ -87,7 +75,6 @@ sub hook_unrecognized_command {
}
}
}
}
else {
return DENY_DISCONNECT;
}

View File

@ -59,8 +59,12 @@ and put a suitable string in config/tls_ciphers (e.g. "DEFAULT" or
=cut
use strict;
use IO::Socket::SSL 0.98;
use Qpsmtpd::Constants;
sub init {
my ($self, $qp, $cert, $key, $ca) = @_;
my $dir = -d 'ssl' ? 'ssl' : 'config/ssl';
@ -179,10 +183,6 @@ sub hook_post_connection {
sub _convert_to_ssl {
my ($self) = @_;
if ($self->qp->isa('Qpsmtpd::PollServer')) {
return _convert_to_ssl_async($self);
}
eval {
my $tlssocket =
IO::Socket::SSL->new_from_fd(
@ -210,14 +210,6 @@ sub _convert_to_ssl {
return 1;
}
sub _convert_to_ssl_async {
my ($self) = @_;
my $upgrader =
$self->connection->notes('tls_upgrader', UpgradeClientSSL->new($self));
$upgrader->upgrade_socket();
return 1;
}
sub can_do_tls {
my ($self) = @_;
$self->tls_cert && -r $self->tls_cert;
@ -265,10 +257,9 @@ sub bad_ssl_hook {
package UpgradeClientSSL;
# borrowed heavily from Perlbal::SocketSSL
use strict;
use warnings;
no warnings qw(deprecated);
no warnings 'deprecated';
use IO::Socket::SSL 0.98;
use Errno qw( EAGAIN );

View File

@ -135,7 +135,6 @@ my %strict_twolevel_cctlds = (
'za' => 1,
);
# async version: OK
sub init {
my ($self, $qp, %args) = @_;
@ -161,14 +160,14 @@ sub init {
for (split /,/, $z[1]) {
unless (/^(-?\d+)$/) {
$self->log(LOGERROR, "Malformed mask $_ for $z[0]");
return undef;
return;
}
$mask |= $1 >= 0 ? $1 : 0x00ffffff;
}
my $action = $z[2] || $self->{action};
unless ($action =~ /^(add-header|deny|denysoft|log)$/) {
$self->log(LOGERROR, "Unknown action $action for $z[0]");
return undef;
return;
}
$self->{uribl_zones}->{$z[0]} = {
@ -184,20 +183,12 @@ sub init {
$self->init_resolver;
}
# async version: not used
sub register {
my $self = shift;
$self->register_hook('data_post', 'data_handler');
}
# async version: not used
sub send_query {
my $self = shift;
my $name = shift || return undef;
my $name = shift || return;
my $count = 0;
$self->{socket_select} ||= new IO::Select or return undef;
$self->{socket_select} ||= new IO::Select or return;
for my $z (keys %{$self->{uribl_zones}}) {
my ($s, $s1);
my $index = {
@ -240,7 +231,6 @@ sub send_query {
$count;
}
# async version: not used
sub lookup_finish {
my $self = shift;
$self->{socket_idx} = {};
@ -248,21 +238,19 @@ sub lookup_finish {
undef $self->{socket_select};
}
# async version: OK
sub evaluate {
my $self = shift;
my $zone = shift || return undef;
my $a = shift || return undef;
my $zone = shift || return;
my $a = shift || return;
my $mask = $self->{uribl_zones}->{$zone}->{mask} || $self->{mask};
$a =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/ or return undef;
$a =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/ or return;
my $v =
(($1 & 0xff) << 24) | (($2 & 0xff) << 16) | (($3 & 0xff) << 8) |
($4 & 0xff);
return ($v & $mask);
}
# async version: OK
sub lookup_start {
my ($self, $transaction, $start_query) = @_;
@ -442,7 +430,6 @@ sub lookup_start {
return $queries;
}
# async version: not used
sub collect_results {
my ($self, $transaction) = @_;
@ -512,8 +499,7 @@ sub collect_results {
return \@matches;
}
# async version: not used
sub data_handler {
sub hook_data {
my ($self, $transaction) = @_;
return (DECLINED) if $self->is_immune();
@ -526,7 +512,7 @@ sub data_handler {
}
);
unless ($queries) {
if (!$queries) {
$self->log(LOGINFO, "pass, No URIs found in mail");
return DECLINED;
}
@ -547,11 +533,8 @@ sub data_handler {
return DECLINED;
}
# async version: not used
sub init_resolver {
my $self = shift;
$self->{resolver} = new Net::DNS::Resolver or return undef;
$self->{resolver} = new Net::DNS::Resolver or return;
$self->{resolver}->udp_timeout($self->{timeout});
}

View File

@ -1,464 +0,0 @@
#!/usr/bin/perl
use lib "./lib";
BEGIN {
delete $ENV{ENV};
delete $ENV{BASH_ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}
# Profiling - requires Devel::Profiler 0.05
#BEGIN { $Devel::Profiler::NO_INIT = 1; }
#use Devel::Profiler;
use strict;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Danga::Socket;
use Danga::Client;
use Qpsmtpd::PollServer;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use IO::Socket;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
use List::Util qw(shuffle);
$|++;
use Socket
qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET AF_UNIX SOCK_STREAM PF_UNSPEC);
$SIG{'PIPE'} = "IGNORE"; # handled manually
$DEBUG = 0;
my $CONFIG_PORT = 20025;
my $CONFIG_LOCALADDR = '127.0.0.1';
my $PORT = 2525;
my $LOCALADDR = '0.0.0.0';
my $PROCS = 1;
my $USER = (getpwuid $>)[0]; # user to suid to
$USER = "smtpd" if $USER eq "root";
my $PAUSED = 0;
my $NUMACCEPT = 20;
my $PID_FILE = '';
my $ACCEPT_RSET;
my $DETACH; # daemonize on startup
# make sure we don't spend forever doing accept()
use constant ACCEPT_MAX => 1000;
sub reset_num_accept {
$NUMACCEPT = 20;
}
sub help {
print <<EOT;
Usage:
qpsmtpd [OPTIONS]
Options:
-l, --listen-address addr : listen on a specific address; default 0.0.0.0
-p, --port P : listen on a specific port; default 2525
--config-port : config server port; default 20025
-u, --user U : run as a particular user; defualt 'smtpd'
-j, --procs J : spawn J processes; default 1
-d, --detach : detach from controlling terminal (daemonize)
--pid-file P : print main servers PID to file P
-h, --help : this page
--use-poll : force use of poll() instead of epoll()/kqueue()
EOT
exit(0);
}
GetOptions(
'p|port=i' => \$PORT,
'l|listen-address=s' => \$LOCALADDR,
'j|procs=i' => \$PROCS,
'v|verbose+' => \$DEBUG,
'u|user=s' => \$USER,
'pid-file=s' => \$PID_FILE,
'd|detach' => \$DETACH,
'h|help' => \&help,
'config-port=i' => \$CONFIG_PORT,
)
|| help();
# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 }
else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 }
else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 }
else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 }
else { &help }
sub force_poll {
$Danga::Socket::HaveEpoll = 0;
$Danga::Socket::HaveKQueue = 0;
}
my $POLL = "with "
. (
$Danga::Socket::HaveEpoll ? "epoll()"
: $Danga::Socket::HaveKQueue ? "kqueue()"
: "poll()"
);
my $SERVER;
my $CONFIG_SERVER;
use constant READY => 1;
use constant ACCEPTING => 2;
use constant RESTARTING => 999;
my %childstatus = ();
if ($PID_FILE && -r $PID_FILE) {
open PID, "<$PID_FILE"
or die "open_pidfile $PID_FILE: $!\n";
my $running_pid = <PID> || '';
chomp $running_pid;
if ($running_pid =~ /^(\d+)/) {
if (kill 0, $running_pid) {
die "Found an already running qpsmtpd with pid $running_pid.\n";
}
}
close(PID);
}
run_as_server();
exit(0);
sub _fork {
my $pid = fork;
if (!defined($pid)) { die "Cannot fork: $!" }
return $pid if $pid;
# Fixup Net::DNS randomness after fork
srand($$ ^ time);
local $^W;
delete $INC{'Net/DNS/Header.pm'};
require Net::DNS::Header;
# cope with different versions of Net::DNS
eval {
$Net::DNS::Resolver::global{id} = 1;
$Net::DNS::Resolver::global{id} =
int(rand(Net::DNS::Resolver::MAX_ID()));
# print "Next DNS ID: $Net::DNS::Resolver::global{id}\n";
};
if ($@) {
# print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n";
}
# Fixup lost kqueue after fork
$Danga::Socket::HaveKQueue = undef;
}
sub spawn_child {
my $plugin_loader = shift || Qpsmtpd::SMTP->new;
socketpair(my $reader, my $writer, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
|| die "Unable to create a pipe";
$writer->autoflush(1);
$reader->autoflush(1);
if (my $pid = _fork) {
$childstatus{$pid} = $writer;
return $pid;
}
$SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
$SIG{PIPE} = 'IGNORE';
$SIG{HUP} = 'IGNORE';
close $CONFIG_SERVER;
Qpsmtpd::PollServer->Reset;
Qpsmtpd::PollServer->OtherFds(
fileno($reader) => sub { command_handler($reader) },
fileno($SERVER) => \&accept_handler,);
$ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
$plugin_loader->run_hooks('post-fork');
Qpsmtpd::PollServer->EventLoop();
exit;
}
# Note this is broken on KQueue because it requires that it handle signals itself or it breaks the event loop.
sub sig_hup {
for my $writer (values %childstatus) {
print $writer "hup\n";
}
}
sub sig_chld {
my $spawn_count = 0;
while ((my $child = waitpid(-1, WNOHANG)) > 0) {
if (!defined $childstatus{$child}) {
next;
}
last unless $child > 0;
print "SIGCHLD: child $child died\n";
delete $childstatus{$child};
$spawn_count++;
}
if ($spawn_count) {
for (1 .. $spawn_count) {
# restart a new child if in poll server mode
my $pid = spawn_child();
}
}
$SIG{CHLD} = \&sig_chld;
}
sub HUNTSMAN {
$SIG{CHLD} = 'DEFAULT';
kill 'INT' => keys %childstatus;
if ($PID_FILE && -e $PID_FILE) {
unlink $PID_FILE or ::log(LOGERROR, "unlink: $PID_FILE: $!");
}
exit(0);
}
sub run_as_server {
# establish SERVER socket, bind and listen.
$SERVER = IO::Socket::INET->new(
LocalPort => $PORT,
LocalAddr => $LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => SOMAXCONN
)
or die "Error creating server $LOCALADDR:$PORT : $@\n";
IO::Handle::blocking($SERVER, 0);
binmode($SERVER, ':raw');
$CONFIG_SERVER =
IO::Socket::INET->new(
LocalPort => $CONFIG_PORT,
LocalAddr => $CONFIG_LOCALADDR,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => 1
)
or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : $@\n";
IO::Handle::blocking($CONFIG_SERVER, 0);
binmode($CONFIG_SERVER, ':raw');
# Drop priviledges
my (undef, undef, $quid, $qgid) = getpwnam $USER
or die "unable to determine uid/gid for $USER\n";
my $groups = "$qgid $qgid";
while (my (undef, undef, $gid, $members) = getgrent) {
my @m = split(/ /, $members);
if (grep { $_ eq $USER } @m) {
$groups .= " $gid";
}
}
endgrent;
$) = $groups;
POSIX::setgid($qgid)
or die "unable to change gid: $!\n";
POSIX::setuid($quid)
or die "unable to change uid: $!\n";
$> = $quid;
# Load plugins here
my $plugin_loader = Qpsmtpd::SMTP->new();
$plugin_loader->load_plugins;
if ($DETACH) {
open STDIN, '/dev/null' or die "/dev/null: $!";
open STDOUT, '>/dev/null' or die "/dev/null: $!";
open STDERR, '>&STDOUT' or die "open(stderr): $!";
defined(my $pid = fork) or die "fork: $!";
exit 0 if $pid;
POSIX::setsid or die "setsid: $!";
}
if ($PID_FILE) {
open PID, ">$PID_FILE" || die "$PID_FILE: $!";
print PID $$, "\n";
close PID;
}
$plugin_loader->log(LOGINFO,
'Running as user '
. (getpwuid($>) || $>)
. ', group '
. (getgrgid($)) || $))
);
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
######################
# more Profiling code
=pod
$plugin_loader->run_hooks('post-fork');
Devel::Profiler->set_options(
bad_subs => [qw(Danga::Socket::EventLoop)],
sub_filter => sub {
my ($pkg, $sub) = @_;
return 0 if $sub eq 'AUTOLOAD';
return 0 if $pkg =~ /ParaDNS::XS/;
return 1;
},
);
Devel::Profiler->init();
Qpsmtpd::PollServer->OtherFds(
fileno($SERVER) => \&accept_handler,
fileno($CONFIG_SERVER) => \&config_handler, );
Qpsmtpd::PollServer->EventLoop;
exit;
=cut
#####################
for (1 .. $PROCS) {
my $pid = spawn_child($plugin_loader);
}
$plugin_loader->log(LOGDEBUG,
"Listening on $PORT with $PROCS children $POLL");
$SIG{CHLD} = \&sig_chld;
$SIG{HUP} = \&sig_hup;
Qpsmtpd::PollServer->OtherFds(fileno($CONFIG_SERVER) => \&config_handler,);
Qpsmtpd::PollServer->EventLoop;
exit;
}
sub config_handler {
my $csock = $CONFIG_SERVER->accept();
if (!$csock) {
# warn("accept failed on config server: $!");
return;
}
binmode($csock, ':raw');
printf("Config server connection\n") if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
my $client = Qpsmtpd::ConfigServer->new($csock);
$client->watch_read(1);
return;
}
sub command_handler {
my $reader = shift;
chomp(my $command = <$reader>);
#print "Got command: $command\n";
my $real_command = "cmd_$command";
no strict 'refs';
$real_command->();
}
sub cmd_hup {
# clear cache
print "Clearing cache\n";
Qpsmtpd::Config::clear_cache();
# should also reload modules... but can't do that yet.
}
# Accept all new connections
sub accept_handler {
for (1 .. $NUMACCEPT) {
return unless _accept_handler();
}
# got here because we have accept's left.
# So double the number we accept next time.
$NUMACCEPT *= 2;
$NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX;
$ACCEPT_RSET->cancel if defined $ACCEPT_RSET;
$ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
}
use Errno qw(EAGAIN EWOULDBLOCK);
sub _accept_handler {
my $csock = $SERVER->accept();
if (!$csock) {
# warn("accept() failed: $!");
return;
}
binmode($csock, ':raw');
printf("Listen child making a Qpsmtpd::PollServer for %d.\n",
fileno($csock))
if $DEBUG;
IO::Handle::blocking($csock, 0);
#setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
#print "Got connection\n";
my $client = Qpsmtpd::PollServer->new($csock);
if ($PAUSED) {
$client->write("451 Sorry, this server is currently paused\r\n");
$client->close;
return 1;
}
$client->process_line("Connect\n");
$client->watch_read(1);
$client->pause_read();
return 1;
}
########################################################################
sub log {
my ($level, $message) = @_;
# $level not used yet. this is reimplemented from elsewhere anyway
warn("$$ fd:? $message\n");
}
sub pause {
my ($pause) = @_;
$PAUSED = $pause;
}

View File

@ -85,7 +85,7 @@ sub config_dir {
}
sub plugin_dirs {
('./plugins', './plugins/ident', './plugins/async');
('./plugins', './plugins/ident');
}
sub log {