Async smtp-forward plugin
git-svn-id: https://svn.perl.org/qpsmtpd/trunk@864 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
parent
9415a518df
commit
d078cd1fcc
390
plugins/async/queue/smtp-forward
Normal file
390
plugins/async/queue/smtp-forward
Normal file
@ -0,0 +1,390 @@
|
||||
#!/usr/bin/perl -w
|
||||
|
||||
=head1 NAME
|
||||
|
||||
smtp-forward
|
||||
|
||||
=head1 DESCRIPTION
|
||||
|
||||
This plugin forwards the mail via SMTP to a specified server, rather than
|
||||
delivering the email locally.
|
||||
|
||||
=head1 CONFIG
|
||||
|
||||
It takes one required parameter, the IP address or hostname to forward to.
|
||||
|
||||
async/queue/smtp-forward 10.2.2.2
|
||||
|
||||
Optionally you can also add a port:
|
||||
|
||||
async/queue/smtp-forward 10.2.2.2 9025
|
||||
|
||||
=cut
|
||||
|
||||
use Qpsmtpd::Constants;
|
||||
|
||||
sub register {
|
||||
my ($self, $qp) = @_;
|
||||
|
||||
$self->register_hook(queue => "start_queue");
|
||||
$self->register_hook(queue => "finish_queue");
|
||||
}
|
||||
|
||||
sub init {
|
||||
my ($self, $qp, @args) = @_;
|
||||
|
||||
if (@args > 0) {
|
||||
if ($args[0] =~ /^([\.\w_-]+)$/) {
|
||||
$self->{_smtp_server} = $1;
|
||||
}
|
||||
else {
|
||||
die "Bad data in smtp server: $args[0]";
|
||||
}
|
||||
$self->{_smtp_port} = 25;
|
||||
if (@args > 1 and $args[1] =~ /^(\d+)$/) {
|
||||
$self->{_smtp_port} = $1;
|
||||
}
|
||||
|
||||
$self->log(LOGWARN, "WARNING: Ignoring additional arguments.") if (@args > 2);
|
||||
}
|
||||
else {
|
||||
die("No SMTP server specified in smtp-forward config");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
sub start_queue {
|
||||
my ($self, $transaction) = @_;
|
||||
|
||||
my $qp = $self->qp;
|
||||
my $SERVER = $self->{_smtp_server};
|
||||
my $PORT = $self->{_smtp_port};
|
||||
$self->log(LOGINFO, "forwarding to $SERVER:$PORT");
|
||||
|
||||
$transaction->notes('async_sender',
|
||||
AsyncSMTPSender->new($SERVER, $PORT, $qp, $self, $transaction)
|
||||
);
|
||||
|
||||
return YIELD;
|
||||
}
|
||||
|
||||
sub finish_queue {
|
||||
my ($self, $transaction) = @_;
|
||||
|
||||
my $sender = $transaction->notes('async_sender');
|
||||
|
||||
my ($rc, $msg) = $sender->results;
|
||||
|
||||
return $rc, $msg;
|
||||
}
|
||||
|
||||
package AsyncSMTPSender;
|
||||
|
||||
use IO::Socket;
|
||||
|
||||
use base qw(Danga::Socket);
|
||||
use fields qw(
|
||||
qp
|
||||
pkg
|
||||
tran
|
||||
state
|
||||
rcode
|
||||
rmsg
|
||||
buf
|
||||
command
|
||||
resp
|
||||
to
|
||||
);
|
||||
|
||||
use constant ST_CONNECTING => 0;
|
||||
use constant ST_CONNECTED => 1;
|
||||
use constant ST_COMMANDS => 2;
|
||||
use constant ST_DATA => 3;
|
||||
|
||||
use Qpsmtpd::Constants;
|
||||
|
||||
sub new {
|
||||
my ($self, $server, $port, $qp, $pkg, $transaction) = @_;
|
||||
$self = fields::new($self) unless ref $self;
|
||||
|
||||
my $sock = IO::Socket::INET->new(
|
||||
PeerAddr => $server,
|
||||
PeerPort => $port,
|
||||
Blocking => 0,
|
||||
) or die "Error connecting to server $server:$port : $!\n";
|
||||
|
||||
IO::Handle::blocking($sock, 0);
|
||||
binmode($sock, ':raw');
|
||||
|
||||
$self->{qp} = $qp;
|
||||
$self->{pkg} = $pkg;
|
||||
$self->{tran} = $transaction;
|
||||
$self->{state} = ST_CONNECTING;
|
||||
$self->{rcode} = DECLINED;
|
||||
$self->{command} = 'connect';
|
||||
$self->{buf} = '';
|
||||
$self->{resp} = [];
|
||||
# copy the recipients so we can pop them off one by one
|
||||
$self->{to} = [ $transaction->recipients ];
|
||||
|
||||
$self->SUPER::new($sock);
|
||||
# Watch for write first, this is when the TCP session is established.
|
||||
$self->watch_write(1);
|
||||
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub results {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
return ( $self->{rcode}, $self->{rmsg} );
|
||||
}
|
||||
|
||||
sub log {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
$self->{qp}->log(@_);
|
||||
}
|
||||
|
||||
sub cont {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
$self->{qp}->run_continuation;
|
||||
}
|
||||
|
||||
sub command {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($command, $params) = @_;
|
||||
$params ||= '';
|
||||
|
||||
$self->log(LOGDEBUG, ">> $command $params");
|
||||
|
||||
$self->write(($command =~ m/ / ? "$command:" : $command)
|
||||
. ($params ? " $params" : "") . "\r\n");
|
||||
$self->watch_read(1);
|
||||
$self->{command} = ($command =~ /(\S+)/)[0];
|
||||
}
|
||||
|
||||
sub handle_response {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
|
||||
my $method = "cmd_" . lc($self->{command});
|
||||
|
||||
$self->$method(@_);
|
||||
}
|
||||
|
||||
sub cmd_connect {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
if ($code != 220) {
|
||||
$self->{rmsg} = "Error on connect: @$response";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
else {
|
||||
my $host = $self->{qp}->config('me');
|
||||
print "HELOing with $host\n";
|
||||
$self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO", $host);
|
||||
}
|
||||
}
|
||||
|
||||
sub cmd_helo {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
if ($code != 250) {
|
||||
$self->{rmsg} = "Error on HELO: @$response";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
else {
|
||||
$self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
|
||||
}
|
||||
}
|
||||
|
||||
sub cmd_ehlo {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
if ($code != 250) {
|
||||
$self->{rmsg} = "Error on EHLO: @$response";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
else {
|
||||
$self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
|
||||
}
|
||||
}
|
||||
|
||||
sub cmd_mail {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
if ($code != 250) {
|
||||
$self->{rmsg} = "Error on MAIL FROM: @$response";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
else {
|
||||
$self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
|
||||
}
|
||||
}
|
||||
|
||||
sub cmd_rcpt {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
if ($code != 250) {
|
||||
$self->{rmsg} = "Error on RCPT TO: @$response";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
else {
|
||||
if (@{$self->{to}}) {
|
||||
$self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
|
||||
}
|
||||
else {
|
||||
$self->command("DATA");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub cmd_data {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
if ($code != 354) {
|
||||
$self->{rmsg} = "Error on DATA: @$response";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
else {
|
||||
# $self->{state} = ST_DATA;
|
||||
$self->datasend($self->{tran}->header->as_string);
|
||||
$self->{tran}->body_resetpos;
|
||||
while (my $line = $self->{tran}->body_getline) {
|
||||
$self->log(LOGDEBUG, ">> $line");
|
||||
$line =~ s/\r?\n/\r\n/;
|
||||
$self->datasend($line);
|
||||
}
|
||||
$self->write(".\r\n");
|
||||
$self->{command} = "DATAEND";
|
||||
}
|
||||
}
|
||||
|
||||
sub cmd_dataend {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
if ($code != 250) {
|
||||
$self->{rmsg} = "Error after DATA: @$response";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
else {
|
||||
$self->command("QUIT");
|
||||
}
|
||||
}
|
||||
|
||||
sub cmd_quit {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($code, $response) = @_;
|
||||
|
||||
$self->{rcode} = OK;
|
||||
$self->{rmsg} = "Queued!";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
|
||||
sub datasend {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
my ($data) = @_;
|
||||
$data =~ s/^\./../mg;
|
||||
$self->write(\$data);
|
||||
}
|
||||
|
||||
sub event_read {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
|
||||
if ($self->{state} == ST_CONNECTED) {
|
||||
$self->{state} = ST_COMMANDS;
|
||||
}
|
||||
|
||||
if ($self->{state} == ST_COMMANDS) {
|
||||
my $in = $self->read(1024);
|
||||
if (!$in) {
|
||||
# XXX: connection closed
|
||||
$self->close("lost connection");
|
||||
return;
|
||||
}
|
||||
|
||||
my @lines = split /\r?\n/, $self->{buf} . $$in, -1;
|
||||
$self->{buf} = delete $lines[-1];
|
||||
|
||||
for(@lines) {
|
||||
if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) {
|
||||
$self->log(LOGDEBUG, "<< $code$cont$rest");
|
||||
push @{$self->{resp}}, $rest;
|
||||
|
||||
if($cont eq ' ') {
|
||||
$self->handle_response($code, $self->{resp});
|
||||
$self->{resp} = [];
|
||||
}
|
||||
}
|
||||
else {
|
||||
$self->log(LOGERROR, "Unrecognised SMTP response line: $_");
|
||||
$self->{rmsg} = "Error from upstream SMTP server";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
$self->log(LOGERROR, "SMTP Session occurred out of order");
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
}
|
||||
|
||||
sub event_write {
|
||||
my AsyncSMTPSender $self = shift;
|
||||
|
||||
if ($self->{state} == ST_CONNECTING) {
|
||||
$self->watch_write(0);
|
||||
$self->{state} = ST_CONNECTED;
|
||||
$self->watch_read(1);
|
||||
}
|
||||
elsif (0 && $self->{state} == ST_DATA) {
|
||||
# send more data
|
||||
if (my $line = $self->{tran}->body_getline) {
|
||||
$self->log(LOGDEBUG, ">> $line");
|
||||
$line =~ s/\r?\n/\r\n/;
|
||||
$self->datasend($line);
|
||||
}
|
||||
else {
|
||||
# no more data.
|
||||
$self->log(LOGINFO, "No more data");
|
||||
$self->watch_write(0);
|
||||
$self->{state} = ST_COMMANDS;
|
||||
}
|
||||
}
|
||||
else {
|
||||
$self->write(undef);
|
||||
}
|
||||
}
|
||||
|
||||
sub event_err {
|
||||
my ($self) = @_;
|
||||
eval { $self->read(1); }; # gives us the correct error in errno
|
||||
$self->{rmsg} = "Read error from remote server: $!";
|
||||
#print "lost connection: $!\n";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
||||
|
||||
sub event_hup {
|
||||
my ($self) = @_;
|
||||
eval { $self->read(1); }; # gives us the correct error in errno
|
||||
$self->{rmsg} = "HUP error from remote server: $!";
|
||||
#print "lost connection: $!\n";
|
||||
$self->close;
|
||||
$self->cont;
|
||||
}
|
Loading…
Reference in New Issue
Block a user