diff --git a/plugins/async/queue/smtp-forward b/plugins/async/queue/smtp-forward new file mode 100644 index 0000000..ab63fe2 --- /dev/null +++ b/plugins/async/queue/smtp-forward @@ -0,0 +1,390 @@ +#!/usr/bin/perl -w + +=head1 NAME + +smtp-forward + +=head1 DESCRIPTION + +This plugin forwards the mail via SMTP to a specified server, rather than +delivering the email locally. + +=head1 CONFIG + +It takes one required parameter, the IP address or hostname to forward to. + + async/queue/smtp-forward 10.2.2.2 + +Optionally you can also add a port: + + async/queue/smtp-forward 10.2.2.2 9025 + +=cut + +use Qpsmtpd::Constants; + +sub register { + my ($self, $qp) = @_; + + $self->register_hook(queue => "start_queue"); + $self->register_hook(queue => "finish_queue"); +} + +sub init { + my ($self, $qp, @args) = @_; + + if (@args > 0) { + if ($args[0] =~ /^([\.\w_-]+)$/) { + $self->{_smtp_server} = $1; + } + else { + die "Bad data in smtp server: $args[0]"; + } + $self->{_smtp_port} = 25; + if (@args > 1 and $args[1] =~ /^(\d+)$/) { + $self->{_smtp_port} = $1; + } + + $self->log(LOGWARN, "WARNING: Ignoring additional arguments.") if (@args > 2); + } + else { + die("No SMTP server specified in smtp-forward config"); + } + +} + +sub start_queue { + my ($self, $transaction) = @_; + + my $qp = $self->qp; + my $SERVER = $self->{_smtp_server}; + my $PORT = $self->{_smtp_port}; + $self->log(LOGINFO, "forwarding to $SERVER:$PORT"); + + $transaction->notes('async_sender', + AsyncSMTPSender->new($SERVER, $PORT, $qp, $self, $transaction) + ); + + return YIELD; +} + +sub finish_queue { + my ($self, $transaction) = @_; + + my $sender = $transaction->notes('async_sender'); + + my ($rc, $msg) = $sender->results; + + return $rc, $msg; +} + +package AsyncSMTPSender; + +use IO::Socket; + +use base qw(Danga::Socket); +use fields qw( + qp + pkg + tran + state + rcode + rmsg + buf + command + resp + to + ); + +use constant ST_CONNECTING => 0; +use constant ST_CONNECTED => 1; +use constant ST_COMMANDS => 2; +use constant ST_DATA => 3; + +use Qpsmtpd::Constants; + +sub new { + my ($self, $server, $port, $qp, $pkg, $transaction) = @_; + $self = fields::new($self) unless ref $self; + + my $sock = IO::Socket::INET->new( + PeerAddr => $server, + PeerPort => $port, + Blocking => 0, + ) or die "Error connecting to server $server:$port : $!\n"; + + IO::Handle::blocking($sock, 0); + binmode($sock, ':raw'); + + $self->{qp} = $qp; + $self->{pkg} = $pkg; + $self->{tran} = $transaction; + $self->{state} = ST_CONNECTING; + $self->{rcode} = DECLINED; + $self->{command} = 'connect'; + $self->{buf} = ''; + $self->{resp} = []; + # copy the recipients so we can pop them off one by one + $self->{to} = [ $transaction->recipients ]; + + $self->SUPER::new($sock); + # Watch for write first, this is when the TCP session is established. + $self->watch_write(1); + + return $self; +} + +sub results { + my AsyncSMTPSender $self = shift; + return ( $self->{rcode}, $self->{rmsg} ); +} + +sub log { + my AsyncSMTPSender $self = shift; + $self->{qp}->log(@_); +} + +sub cont { + my AsyncSMTPSender $self = shift; + $self->{qp}->run_continuation; +} + +sub command { + my AsyncSMTPSender $self = shift; + my ($command, $params) = @_; + $params ||= ''; + + $self->log(LOGDEBUG, ">> $command $params"); + + $self->write(($command =~ m/ / ? "$command:" : $command) + . ($params ? " $params" : "") . "\r\n"); + $self->watch_read(1); + $self->{command} = ($command =~ /(\S+)/)[0]; +} + +sub handle_response { + my AsyncSMTPSender $self = shift; + + my $method = "cmd_" . lc($self->{command}); + + $self->$method(@_); +} + +sub cmd_connect { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + if ($code != 220) { + $self->{rmsg} = "Error on connect: @$response"; + $self->close; + $self->cont; + } + else { + my $host = $self->{qp}->config('me'); + print "HELOing with $host\n"; + $self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO", $host); + } +} + +sub cmd_helo { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + if ($code != 250) { + $self->{rmsg} = "Error on HELO: @$response"; + $self->close; + $self->cont; + } + else { + $self->command("MAIL", "FROM:" . $self->{tran}->sender->format); + } +} + +sub cmd_ehlo { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + if ($code != 250) { + $self->{rmsg} = "Error on EHLO: @$response"; + $self->close; + $self->cont; + } + else { + $self->command("MAIL", "FROM:" . $self->{tran}->sender->format); + } +} + +sub cmd_mail { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + if ($code != 250) { + $self->{rmsg} = "Error on MAIL FROM: @$response"; + $self->close; + $self->cont; + } + else { + $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format); + } +} + +sub cmd_rcpt { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + if ($code != 250) { + $self->{rmsg} = "Error on RCPT TO: @$response"; + $self->close; + $self->cont; + } + else { + if (@{$self->{to}}) { + $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format); + } + else { + $self->command("DATA"); + } + } +} + +sub cmd_data { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + if ($code != 354) { + $self->{rmsg} = "Error on DATA: @$response"; + $self->close; + $self->cont; + } + else { + # $self->{state} = ST_DATA; + $self->datasend($self->{tran}->header->as_string); + $self->{tran}->body_resetpos; + while (my $line = $self->{tran}->body_getline) { + $self->log(LOGDEBUG, ">> $line"); + $line =~ s/\r?\n/\r\n/; + $self->datasend($line); + } + $self->write(".\r\n"); + $self->{command} = "DATAEND"; + } +} + +sub cmd_dataend { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + if ($code != 250) { + $self->{rmsg} = "Error after DATA: @$response"; + $self->close; + $self->cont; + } + else { + $self->command("QUIT"); + } +} + +sub cmd_quit { + my AsyncSMTPSender $self = shift; + my ($code, $response) = @_; + + $self->{rcode} = OK; + $self->{rmsg} = "Queued!"; + $self->close; + $self->cont; +} + +sub datasend { + my AsyncSMTPSender $self = shift; + my ($data) = @_; + $data =~ s/^\./../mg; + $self->write(\$data); +} + +sub event_read { + my AsyncSMTPSender $self = shift; + + if ($self->{state} == ST_CONNECTED) { + $self->{state} = ST_COMMANDS; + } + + if ($self->{state} == ST_COMMANDS) { + my $in = $self->read(1024); + if (!$in) { + # XXX: connection closed + $self->close("lost connection"); + return; + } + + my @lines = split /\r?\n/, $self->{buf} . $$in, -1; + $self->{buf} = delete $lines[-1]; + + for(@lines) { + if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) { + $self->log(LOGDEBUG, "<< $code$cont$rest"); + push @{$self->{resp}}, $rest; + + if($cont eq ' ') { + $self->handle_response($code, $self->{resp}); + $self->{resp} = []; + } + } + else { + $self->log(LOGERROR, "Unrecognised SMTP response line: $_"); + $self->{rmsg} = "Error from upstream SMTP server"; + $self->close; + $self->cont; + } + } + } + else { + $self->log(LOGERROR, "SMTP Session occurred out of order"); + $self->close; + $self->cont; + } +} + +sub event_write { + my AsyncSMTPSender $self = shift; + + if ($self->{state} == ST_CONNECTING) { + $self->watch_write(0); + $self->{state} = ST_CONNECTED; + $self->watch_read(1); + } + elsif (0 && $self->{state} == ST_DATA) { + # send more data + if (my $line = $self->{tran}->body_getline) { + $self->log(LOGDEBUG, ">> $line"); + $line =~ s/\r?\n/\r\n/; + $self->datasend($line); + } + else { + # no more data. + $self->log(LOGINFO, "No more data"); + $self->watch_write(0); + $self->{state} = ST_COMMANDS; + } + } + else { + $self->write(undef); + } +} + +sub event_err { + my ($self) = @_; + eval { $self->read(1); }; # gives us the correct error in errno + $self->{rmsg} = "Read error from remote server: $!"; + #print "lost connection: $!\n"; + $self->close; + $self->cont; +} + +sub event_hup { + my ($self) = @_; + eval { $self->read(1); }; # gives us the correct error in errno + $self->{rmsg} = "HUP error from remote server: $!"; + #print "lost connection: $!\n"; + $self->close; + $self->cont; +}