#!perl -w

=head1 NAME

smtp-forward

=head1 DESCRIPTION

This plugin forwards the mail via SMTP to a specified server, rather than
delivering the email locally.

=head1 CONFIG

It takes one required parameter, the IP address or hostname to forward to. 

  async/queue/smtp-forward 10.2.2.2

Optionally you can also add a port:

  async/queue/smtp-forward 10.2.2.2 9025

=cut

use Qpsmtpd::Constants;

sub register {
    my ($self, $qp) = @_;
    
    $self->register_hook(queue => "start_queue");
    $self->register_hook(queue => "finish_queue");
}

sub init {
    my ($self, $qp, @args) = @_;

    if (@args > 0) {
        if ($args[0] =~ /^([\.\w_-]+)$/) {
            $self->{_smtp_server} = $1;
        }
        else {
            die "Bad data in smtp server: $args[0]";
        }
        $self->{_smtp_port} = 25;
        if (@args > 1 and $args[1] =~ /^(\d+)$/) {
            $self->{_smtp_port} = $1;
        }
    
        $self->log(LOGWARN, "WARNING: Ignoring additional arguments.") if (@args > 2);
    }
    else {
        die("No SMTP server specified in smtp-forward config");
    }

}

sub start_queue {
    my ($self, $transaction) = @_;
    
    my $qp = $self->qp;
    my $SERVER = $self->{_smtp_server};
    my $PORT   = $self->{_smtp_port};
    $self->log(LOGINFO, "forwarding to $SERVER:$PORT");
    
    $transaction->notes('async_sender', 
        AsyncSMTPSender->new($SERVER, $PORT, $qp, $self, $transaction)
    );
    
    return YIELD;
}

sub finish_queue {
    my ($self, $transaction) = @_;
    
    my $sender = $transaction->notes('async_sender');
    $transaction->notes('async_sender', undef);
    
    my ($rc, $msg) = $sender->results;
    
    return $rc, $msg;
}

package AsyncSMTPSender;

use IO::Socket;

use base qw(Danga::Socket);
use fields qw(
    qp
    pkg
    tran
    state
    rcode
    rmsg
    buf
    command
    resp
    to
    );

use constant ST_CONNECTING => 0;
use constant ST_CONNECTED  => 1;
use constant ST_COMMANDS   => 2;
use constant ST_DATA       => 3;

use Qpsmtpd::Constants;

sub new {
    my ($self, $server, $port, $qp, $pkg, $transaction) = @_;
    $self = fields::new($self) unless ref $self;
    
    my $sock = IO::Socket::INET->new(
        PeerAddr => $server,
        PeerPort => $port,
        Blocking  => 0,
    ) or die "Error connecting to server $server:$port : $!\n";

    IO::Handle::blocking($sock, 0);
    binmode($sock, ':raw');
    
    $self->{qp} = $qp;
    $self->{pkg} = $pkg;
    $self->{tran} = $transaction;
    $self->{state} = ST_CONNECTING;
    $self->{rcode} = DECLINED;
    $self->{command} = 'connect';
    $self->{buf} = '';
    $self->{resp} = [];
    # copy the recipients so we can pop them off one by one
    $self->{to} = [ $transaction->recipients ];
    
    $self->SUPER::new($sock);
    # Watch for write first, this is when the TCP session is established.
    $self->watch_write(1);

    return $self;
}

sub results {
    my AsyncSMTPSender $self = shift;
    return ( $self->{rcode}, $self->{rmsg} );
}

sub log {
    my AsyncSMTPSender $self = shift;
    $self->{qp}->log(@_);
}

sub cont {
    my AsyncSMTPSender $self = shift;
    $self->{qp}->run_continuation;
}

sub command {
    my AsyncSMTPSender $self = shift;
    my ($command, $params) = @_;
    $params ||= '';
    
    $self->log(LOGDEBUG, ">> $command $params");
    
    $self->write(($command =~ m/ / ? "$command:" : $command)
      . ($params ? " $params" : "") . "\r\n");
    $self->watch_read(1);
    $self->{command} = ($command =~ /(\S+)/)[0];
}

sub handle_response {
    my AsyncSMTPSender $self = shift;
    
    my $method = "cmd_" . lc($self->{command});
    
    $self->$method(@_);
}

sub cmd_connect {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    if ($code != 220) {
        $self->{rmsg} = "Error on connect: @$response";
        $self->close;
        $self->cont;
    }
    else {
        my $host = $self->{qp}->config('me');
        print "HELOing with $host\n";
        $self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO", $host);
    }
}

sub cmd_helo {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    if ($code != 250) {
        $self->{rmsg} = "Error on HELO: @$response";
        $self->close;
        $self->cont;
    }
    else {
        $self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
    }
}

sub cmd_ehlo {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    if ($code != 250) {
        $self->{rmsg} = "Error on EHLO: @$response";
        $self->close;
        $self->cont;
    }
    else {
        $self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
    }
}

sub cmd_mail {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    if ($code != 250) {
        $self->{rmsg} = "Error on MAIL FROM: @$response";
        $self->close;
        $self->cont;
    }
    else {
        $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
    }
}

sub cmd_rcpt {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    if ($code != 250) {
        $self->{rmsg} = "Error on RCPT TO: @$response";
        $self->close;
        $self->cont;
    }
    else {
        if (@{$self->{to}}) {
            $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
        }
        else {
            $self->command("DATA");
        }
    }
}

sub cmd_data {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    if ($code != 354) {
        $self->{rmsg} = "Error on DATA: @$response";
        $self->close;
        $self->cont;
    }
    else {
        # $self->{state} = ST_DATA;
        $self->datasend($self->{tran}->header->as_string);
        $self->{tran}->body_resetpos;
        my $write_buf = '';
        while (my $line = $self->{tran}->body_getline) {
            $line =~ s/\r?\n/\r\n/;
            $write_buf .= $line;
            if (length($write_buf) >= 131072) { # 128KB, arbitrary value
                $self->log(LOGDEBUG, ">> $write_buf");
                $self->datasend($write_buf);
                $write_buf = '';
            }
        }
        if (length($write_buf)) {
            $self->log(LOGDEBUG, ">> $write_buf");
            $self->datasend($write_buf);
        }
        $self->write(".\r\n");
        $self->{command} = "DATAEND";
    }
}

sub cmd_dataend {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    if ($code != 250) {
        $self->{rmsg} = "Error after DATA: @$response";
        $self->close;
        $self->cont;
    }
    else {
        $self->command("QUIT");
    }
}

sub cmd_quit {
    my AsyncSMTPSender $self = shift;
    my ($code, $response) = @_;
    
    $self->{rcode} = OK;
    $self->{rmsg} = "Queued!";
    $self->close;
    $self->cont;
}

sub datasend {
    my AsyncSMTPSender $self = shift;
    my ($data) = @_;
    $data =~ s/^\./../mg;
    $self->write(\$data);
}

sub event_read {
    my AsyncSMTPSender $self = shift;
    
    if ($self->{state} == ST_CONNECTED) {
        $self->{state} = ST_COMMANDS;
    }

    if ($self->{state} == ST_COMMANDS) {
        my $in = $self->read(1024);
        if (!$in) {
            # XXX: connection closed
            $self->close("lost connection");
            return;
        }
    
        my @lines = split /\r?\n/, $self->{buf} . $$in, -1;
        $self->{buf} = delete $lines[-1];
    
        for(@lines) {
            if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) {
                $self->log(LOGDEBUG, "<< $code$cont$rest");
                push @{$self->{resp}}, $rest;

                if($cont eq ' ') {
                    $self->handle_response($code, $self->{resp});
                    $self->{resp} = [];
                }
            }
            else {
                $self->log(LOGERROR, "Unrecognised SMTP response line: $_");
                $self->{rmsg} = "Error from upstream SMTP server";
                $self->close;
                $self->cont;
            }
        }
    }
    else {
        $self->log(LOGERROR, "SMTP Session occurred out of order");
        $self->close;
        $self->cont;
    }
}

sub event_write {
    my AsyncSMTPSender $self = shift;

    if ($self->{state} == ST_CONNECTING) {
        $self->watch_write(0);
        $self->{state} = ST_CONNECTED;
        $self->watch_read(1);
    }
    elsif (0 && $self->{state} == ST_DATA) {
        # send more data
        if (my $line = $self->{tran}->body_getline) {
            $self->log(LOGDEBUG, ">> $line");
            $line =~ s/\r?\n/\r\n/;
            $self->datasend($line);
        }
        else {
            # no more data.
            $self->log(LOGINFO, "No more data");
            $self->watch_write(0);
            $self->{state} = ST_COMMANDS;
        }
    }
    else {
        $self->write(undef);
    }
}

sub event_err {
    my ($self) = @_;
    eval { $self->read(1); }; # gives us the correct error in errno
    $self->{rmsg} = "Read error from remote server: $!";
    #print "lost connection: $!\n";
    $self->close;
    $self->cont;
}

sub event_hup {
    my ($self) = @_;
    eval { $self->read(1); }; # gives us the correct error in errno
    $self->{rmsg} = "HUP error from remote server: $!";
    #print "lost connection: $!\n";
    $self->close;
    $self->cont;
}