#!/usr/bin/perl -w =head1 NAME smtp-forward =head1 DESCRIPTION This plugin forwards the mail via SMTP to a specified server, rather than delivering the email locally. =head1 CONFIG It takes one required parameter, the IP address or hostname to forward to. async/queue/smtp-forward 10.2.2.2 Optionally you can also add a port: async/queue/smtp-forward 10.2.2.2 9025 =cut use Qpsmtpd::Constants; sub register { my ($self, $qp) = @_; $self->register_hook(queue => "start_queue"); $self->register_hook(queue => "finish_queue"); } sub init { my ($self, $qp, @args) = @_; if (@args > 0) { if ($args[0] =~ /^([\.\w_-]+)$/) { $self->{_smtp_server} = $1; } else { die "Bad data in smtp server: $args[0]"; } $self->{_smtp_port} = 25; if (@args > 1 and $args[1] =~ /^(\d+)$/) { $self->{_smtp_port} = $1; } $self->log(LOGWARN, "WARNING: Ignoring additional arguments.") if (@args > 2); } else { die("No SMTP server specified in smtp-forward config"); } } sub start_queue { my ($self, $transaction) = @_; my $qp = $self->qp; my $SERVER = $self->{_smtp_server}; my $PORT = $self->{_smtp_port}; $self->log(LOGINFO, "forwarding to $SERVER:$PORT"); $transaction->notes('async_sender', AsyncSMTPSender->new($SERVER, $PORT, $qp, $self, $transaction) ); return YIELD; } sub finish_queue { my ($self, $transaction) = @_; my $sender = $transaction->notes('async_sender'); $transaction->notes('async_sender', undef); my ($rc, $msg) = $sender->results; return $rc, $msg; } package AsyncSMTPSender; use IO::Socket; use base qw(Danga::Socket); use fields qw( qp pkg tran state rcode rmsg buf command resp to ); use constant ST_CONNECTING => 0; use constant ST_CONNECTED => 1; use constant ST_COMMANDS => 2; use constant ST_DATA => 3; use Qpsmtpd::Constants; sub new { my ($self, $server, $port, $qp, $pkg, $transaction) = @_; $self = fields::new($self) unless ref $self; my $sock = IO::Socket::INET->new( PeerAddr => $server, PeerPort => $port, Blocking => 0, ) or die "Error connecting to server $server:$port : $!\n"; IO::Handle::blocking($sock, 0); binmode($sock, ':raw'); $self->{qp} = $qp; $self->{pkg} = $pkg; $self->{tran} = $transaction; $self->{state} = ST_CONNECTING; $self->{rcode} = DECLINED; $self->{command} = 'connect'; $self->{buf} = ''; $self->{resp} = []; # copy the recipients so we can pop them off one by one $self->{to} = [ $transaction->recipients ]; $self->SUPER::new($sock); # Watch for write first, this is when the TCP session is established. $self->watch_write(1); return $self; } sub results { my AsyncSMTPSender $self = shift; return ( $self->{rcode}, $self->{rmsg} ); } sub log { my AsyncSMTPSender $self = shift; $self->{qp}->log(@_); } sub cont { my AsyncSMTPSender $self = shift; $self->{qp}->run_continuation; } sub command { my AsyncSMTPSender $self = shift; my ($command, $params) = @_; $params ||= ''; $self->log(LOGDEBUG, ">> $command $params"); $self->write(($command =~ m/ / ? "$command:" : $command) . ($params ? " $params" : "") . "\r\n"); $self->watch_read(1); $self->{command} = ($command =~ /(\S+)/)[0]; } sub handle_response { my AsyncSMTPSender $self = shift; my $method = "cmd_" . lc($self->{command}); $self->$method(@_); } sub cmd_connect { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; if ($code != 220) { $self->{rmsg} = "Error on connect: @$response"; $self->close; $self->cont; } else { my $host = $self->{qp}->config('me'); print "HELOing with $host\n"; $self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO", $host); } } sub cmd_helo { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; if ($code != 250) { $self->{rmsg} = "Error on HELO: @$response"; $self->close; $self->cont; } else { $self->command("MAIL", "FROM:" . $self->{tran}->sender->format); } } sub cmd_ehlo { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; if ($code != 250) { $self->{rmsg} = "Error on EHLO: @$response"; $self->close; $self->cont; } else { $self->command("MAIL", "FROM:" . $self->{tran}->sender->format); } } sub cmd_mail { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; if ($code != 250) { $self->{rmsg} = "Error on MAIL FROM: @$response"; $self->close; $self->cont; } else { $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format); } } sub cmd_rcpt { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; if ($code != 250) { $self->{rmsg} = "Error on RCPT TO: @$response"; $self->close; $self->cont; } else { if (@{$self->{to}}) { $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format); } else { $self->command("DATA"); } } } sub cmd_data { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; if ($code != 354) { $self->{rmsg} = "Error on DATA: @$response"; $self->close; $self->cont; } else { # $self->{state} = ST_DATA; $self->datasend($self->{tran}->header->as_string); $self->{tran}->body_resetpos; my $write_buf = ''; while (my $line = $self->{tran}->body_getline) { $line =~ s/\r?\n/\r\n/; $write_buf .= $line; if (length($write_buf) >= 131072) { # 128KB, arbitrary value $self->log(LOGDEBUG, ">> $write_buf"); $self->datasend($write_buf); $write_buf = ''; } } if (length($write_buf)) { $self->log(LOGDEBUG, ">> $write_buf"); $self->datasend($write_buf); } $self->write(".\r\n"); $self->{command} = "DATAEND"; } } sub cmd_dataend { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; if ($code != 250) { $self->{rmsg} = "Error after DATA: @$response"; $self->close; $self->cont; } else { $self->command("QUIT"); } } sub cmd_quit { my AsyncSMTPSender $self = shift; my ($code, $response) = @_; $self->{rcode} = OK; $self->{rmsg} = "Queued!"; $self->close; $self->cont; } sub datasend { my AsyncSMTPSender $self = shift; my ($data) = @_; $data =~ s/^\./../mg; $self->write(\$data); } sub event_read { my AsyncSMTPSender $self = shift; if ($self->{state} == ST_CONNECTED) { $self->{state} = ST_COMMANDS; } if ($self->{state} == ST_COMMANDS) { my $in = $self->read(1024); if (!$in) { # XXX: connection closed $self->close("lost connection"); return; } my @lines = split /\r?\n/, $self->{buf} . $$in, -1; $self->{buf} = delete $lines[-1]; for(@lines) { if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) { $self->log(LOGDEBUG, "<< $code$cont$rest"); push @{$self->{resp}}, $rest; if($cont eq ' ') { $self->handle_response($code, $self->{resp}); $self->{resp} = []; } } else { $self->log(LOGERROR, "Unrecognised SMTP response line: $_"); $self->{rmsg} = "Error from upstream SMTP server"; $self->close; $self->cont; } } } else { $self->log(LOGERROR, "SMTP Session occurred out of order"); $self->close; $self->cont; } } sub event_write { my AsyncSMTPSender $self = shift; if ($self->{state} == ST_CONNECTING) { $self->watch_write(0); $self->{state} = ST_CONNECTED; $self->watch_read(1); } elsif (0 && $self->{state} == ST_DATA) { # send more data if (my $line = $self->{tran}->body_getline) { $self->log(LOGDEBUG, ">> $line"); $line =~ s/\r?\n/\r\n/; $self->datasend($line); } else { # no more data. $self->log(LOGINFO, "No more data"); $self->watch_write(0); $self->{state} = ST_COMMANDS; } } else { $self->write(undef); } } sub event_err { my ($self) = @_; eval { $self->read(1); }; # gives us the correct error in errno $self->{rmsg} = "Read error from remote server: $!"; #print "lost connection: $!\n"; $self->close; $self->cont; } sub event_hup { my ($self) = @_; eval { $self->read(1); }; # gives us the correct error in errno $self->{rmsg} = "HUP error from remote server: $!"; #print "lost connection: $!\n"; $self->close; $self->cont; }