qpsmtpd/plugins/async/queue/smtp-forward

414 lines
9.4 KiB
Perl

#!perl -w
=head1 NAME
smtp-forward
=head1 DESCRIPTION
This plugin forwards the mail via SMTP to a specified server, rather than
delivering the email locally.
=head1 CONFIG
It takes one required parameter, the IP address or hostname to forward to.
async/queue/smtp-forward 10.2.2.2
Optionally you can also add a port:
async/queue/smtp-forward 10.2.2.2 9025
=cut
use Qpsmtpd::Constants;
sub register {
my ($self, $qp) = @_;
$self->register_hook(queue => "start_queue");
$self->register_hook(queue => "finish_queue");
}
sub init {
my ($self, $qp, @args) = @_;
if (@args > 0) {
if ($args[0] =~ /^([\.\w_-]+)$/) {
$self->{_smtp_server} = $1;
}
else {
die "Bad data in smtp server: $args[0]";
}
$self->{_smtp_port} = 25;
if (@args > 1 and $args[1] =~ /^(\d+)$/) {
$self->{_smtp_port} = $1;
}
$self->log(LOGWARN, "WARNING: Ignoring additional arguments.")
if (@args > 2);
}
else {
die("No SMTP server specified in smtp-forward config");
}
}
sub start_queue {
my ($self, $transaction) = @_;
my $qp = $self->qp;
my $SERVER = $self->{_smtp_server};
my $PORT = $self->{_smtp_port};
$self->log(LOGINFO, "forwarding to $SERVER:$PORT");
$transaction->notes(
'async_sender',
AsyncSMTPSender->new(
$SERVER, $PORT, $qp, $self, $transaction
)
);
return YIELD;
}
sub finish_queue {
my ($self, $transaction) = @_;
my $sender = $transaction->notes('async_sender');
$transaction->notes('async_sender', undef);
my ($rc, $msg) = $sender->results;
return $rc, $msg;
}
package AsyncSMTPSender;
use IO::Socket;
use base qw(Danga::Socket);
use fields qw(
qp
pkg
tran
state
rcode
rmsg
buf
command
resp
to
);
use constant ST_CONNECTING => 0;
use constant ST_CONNECTED => 1;
use constant ST_COMMANDS => 2;
use constant ST_DATA => 3;
use Qpsmtpd::Constants;
sub new {
my ($self, $server, $port, $qp, $pkg, $transaction) = @_;
$self = fields::new($self) unless ref $self;
my $sock = IO::Socket::INET->new(
PeerAddr => $server,
PeerPort => $port,
Blocking => 0,
)
or die "Error connecting to server $server:$port : $!\n";
IO::Handle::blocking($sock, 0);
binmode($sock, ':raw');
$self->{qp} = $qp;
$self->{pkg} = $pkg;
$self->{tran} = $transaction;
$self->{state} = ST_CONNECTING;
$self->{rcode} = DECLINED;
$self->{command} = 'connect';
$self->{buf} = '';
$self->{resp} = [];
# copy the recipients so we can pop them off one by one
$self->{to} = [$transaction->recipients];
$self->SUPER::new($sock);
# Watch for write first, this is when the TCP session is established.
$self->watch_write(1);
return $self;
}
sub results {
my AsyncSMTPSender $self = shift;
return ($self->{rcode}, $self->{rmsg});
}
sub log {
my AsyncSMTPSender $self = shift;
$self->{qp}->log(@_);
}
sub cont {
my AsyncSMTPSender $self = shift;
$self->{qp}->run_continuation;
}
sub command {
my AsyncSMTPSender $self = shift;
my ($command, $params) = @_;
$params ||= '';
$self->log(LOGDEBUG, ">> $command $params");
$self->write( ($command =~ m/ / ? "$command:" : $command)
. ($params ? " $params" : "")
. "\r\n");
$self->watch_read(1);
$self->{command} = ($command =~ /(\S+)/)[0];
}
sub handle_response {
my AsyncSMTPSender $self = shift;
my $method = "cmd_" . lc($self->{command});
$self->$method(@_);
}
sub cmd_connect {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 220) {
$self->{rmsg} = "Error on connect: @$response";
$self->close;
$self->cont;
}
else {
my $host = $self->{qp}->config('me');
print "HELOing with $host\n";
$self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO",
$host);
}
}
sub cmd_helo {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on HELO: @$response";
$self->close;
$self->cont;
}
else {
$self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
}
}
sub cmd_ehlo {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on EHLO: @$response";
$self->close;
$self->cont;
}
else {
$self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
}
}
sub cmd_mail {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on MAIL FROM: @$response";
$self->close;
$self->cont;
}
else {
$self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
}
}
sub cmd_rcpt {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error on RCPT TO: @$response";
$self->close;
$self->cont;
}
else {
if (@{$self->{to}}) {
$self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
}
else {
$self->command("DATA");
}
}
}
sub cmd_data {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 354) {
$self->{rmsg} = "Error on DATA: @$response";
$self->close;
$self->cont;
}
else {
# $self->{state} = ST_DATA;
$self->datasend($self->{tran}->header->as_string);
$self->{tran}->body_resetpos;
my $write_buf = '';
while (my $line = $self->{tran}->body_getline) {
$line =~ s/\r?\n/\r\n/;
$write_buf .= $line;
if (length($write_buf) >= 131072) { # 128KB, arbitrary value
$self->log(LOGDEBUG, ">> $write_buf");
$self->datasend($write_buf);
$write_buf = '';
}
}
if (length($write_buf)) {
$self->log(LOGDEBUG, ">> $write_buf");
$self->datasend($write_buf);
}
$self->write(".\r\n");
$self->{command} = "DATAEND";
}
}
sub cmd_dataend {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
if ($code != 250) {
$self->{rmsg} = "Error after DATA: @$response";
$self->close;
$self->cont;
}
else {
$self->command("QUIT");
}
}
sub cmd_quit {
my AsyncSMTPSender $self = shift;
my ($code, $response) = @_;
$self->{rcode} = OK;
$self->{rmsg} = "Queued!";
$self->close;
$self->cont;
}
sub datasend {
my AsyncSMTPSender $self = shift;
my ($data) = @_;
$data =~ s/^\./../mg;
$self->write(\$data);
}
sub event_read {
my AsyncSMTPSender $self = shift;
if ($self->{state} == ST_CONNECTED) {
$self->{state} = ST_COMMANDS;
}
if ($self->{state} == ST_COMMANDS) {
my $in = $self->read(1024);
if (!$in) {
# XXX: connection closed
$self->close("lost connection");
return;
}
my @lines = split /\r?\n/, $self->{buf} . $$in, -1;
$self->{buf} = delete $lines[-1];
for (@lines) {
if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) {
$self->log(LOGDEBUG, "<< $code$cont$rest");
push @{$self->{resp}}, $rest;
if ($cont eq ' ') {
$self->handle_response($code, $self->{resp});
$self->{resp} = [];
}
}
else {
$self->log(LOGERROR, "Unrecognised SMTP response line: $_");
$self->{rmsg} = "Error from upstream SMTP server";
$self->close;
$self->cont;
}
}
}
else {
$self->log(LOGERROR, "SMTP Session occurred out of order");
$self->close;
$self->cont;
}
}
sub event_write {
my AsyncSMTPSender $self = shift;
if ($self->{state} == ST_CONNECTING) {
$self->watch_write(0);
$self->{state} = ST_CONNECTED;
$self->watch_read(1);
}
elsif (0 && $self->{state} == ST_DATA) {
# send more data
if (my $line = $self->{tran}->body_getline) {
$self->log(LOGDEBUG, ">> $line");
$line =~ s/\r?\n/\r\n/;
$self->datasend($line);
}
else {
# no more data.
$self->log(LOGINFO, "No more data");
$self->watch_write(0);
$self->{state} = ST_COMMANDS;
}
}
else {
$self->write(undef);
}
}
sub event_err {
my ($self) = @_;
eval { $self->read(1); }; # gives us the correct error in errno
$self->{rmsg} = "Read error from remote server: $!";
#print "lost connection: $!\n";
$self->close;
$self->cont;
}
sub event_hup {
my ($self) = @_;
eval { $self->read(1); }; # gives us the correct error in errno
$self->{rmsg} = "HUP error from remote server: $!";
#print "lost connection: $!\n";
$self->close;
$self->cont;
}