feat: started switching to real queueing
This commit is contained in:
parent
959dc8966e
commit
7ca06e4dc6
@ -29,7 +29,7 @@ sub createStatements
|
||||
|
||||
$self->{fetch_all_sth} = $self->{dbh}->prepare("select username,dovecot_server from email_address where domain=? and isFetchAll=1");
|
||||
|
||||
$self->{fetch_dovecot_details_sth} = $self->{dbh}->prepare("select hostname, port from dovecot_server where name=?");
|
||||
$self->{fetch_dovecot_details_sth} = $self->{dbh}->prepare("select username, hostname, port from dovecot_server RIGHT JOIN email_address ON dovecot_server.name = email_address.dovecot_server where alias=? and domain=?");
|
||||
|
||||
}
|
||||
|
||||
@ -73,46 +73,93 @@ sub createDSN
|
||||
$self->log(LOGDEBUG, "created DSN " . $self->{dsn});
|
||||
}
|
||||
|
||||
sub updateTransactionWithRecipientInfo
|
||||
sub updateTransactionWithSmtpInfo
|
||||
{
|
||||
my ($self, $transaction, $rcpt_row) = @_;
|
||||
my ($self, $transaction, $recipient) = @_;
|
||||
|
||||
return 0 unless defined($rcpt_row);
|
||||
return 0 unless defined($rcpt_row->{username});
|
||||
return 0 unless defined($rcpt_row->{dovecot_server});
|
||||
my $queue = $transaction->notes("queue") || {};
|
||||
my $rcpt = $recipient->user . "@" . $recipient->host;
|
||||
|
||||
$queue->{$rcpt}->{destination} = "relay";
|
||||
$queue->{$rcpt}->{protocol} = "smtp";
|
||||
$queue->{$rcpt}->{host} = $self->qp->config("relay_server");
|
||||
|
||||
$transaction->notes("queue", $queue);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
my $dovecot_server = $rcpt_row->{dovecot_server};
|
||||
my $username = $rcpt_row->{username};
|
||||
sub updateTransactionWithLmtpInfo
|
||||
{
|
||||
my ($self, $transaction, $recipient) = @_;
|
||||
|
||||
$self->log(LOGNOTICE,"transaction update " . $dovecot_server . "/" . $rcpt_row->{dovecot_server} . " " . $username);
|
||||
|
||||
my $result = $self->{fetch_dovecot_details_sth}->execute($dovecot_server);
|
||||
my $result = $self->{fetch_dovecot_details_sth}->execute($recipient->user, $recipient->host);
|
||||
if (!$result)
|
||||
{
|
||||
$self->log(LOGERROR, "Failed to fetch dovecot server information for user " . $username . " and dovecot server " . $dovecot_server );
|
||||
return 0;
|
||||
$self->log(LOGERROR, "Failed to fetch dovecot information from the database"); return -1;
|
||||
}
|
||||
|
||||
if ($self->{fetch_dovecot_details_sth}->rows == 0)
|
||||
{
|
||||
$self->log(LOGERROR, "no dovecot server information found for user " . $username . " and dovecot server " . $dovecot_server );
|
||||
return 0;
|
||||
$self->log(LOGERROR, "no dovecot information in database found");
|
||||
return -1;
|
||||
}
|
||||
elsif ($self->{fetch_dovecot_details_sth}->rows > 1)
|
||||
{
|
||||
$self->log(LOGERROR, "too many dovecot entries in the database");
|
||||
return -1;
|
||||
}
|
||||
|
||||
my $row = $self->{fetch_dovecot_details_sth}->fetchrow_hashref();
|
||||
my $row = $self->{fetch_dovecot_details_sth}->fetchrow_hashref;
|
||||
|
||||
my $username = $row->{username}
|
||||
my $hostname = $row->{hostname};
|
||||
my $port = $row->{port};
|
||||
|
||||
$transaction->notes("queue", "lmtp://$hostname:$port");
|
||||
$transaction->notes("destination-user",$username);
|
||||
my $queue = $transaction->notes("queue") || {};
|
||||
my $rcpt = $recipient->user . "@" . $recipient->host;
|
||||
|
||||
$queue->{$rcpt}->{destination} = "local";
|
||||
$queue->{$rcpt}->{protocol} = "lmtp";
|
||||
$queue->{$rcpt}->{host} = $hostname;
|
||||
$queue->{$rcpt}->{port} = $port;
|
||||
$queue->{$rcpt}->{user} = $username;
|
||||
|
||||
$transaction->notes("queue", $queue);
|
||||
|
||||
$self->log(LOGNOTICE, "Setting LMTP server to dovecot on $hostname:$port for user: $username");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
sub CheckRecipient
|
||||
{
|
||||
my ($self, $recipient) = @_;
|
||||
|
||||
$self->connect();
|
||||
$self->createStatements();
|
||||
|
||||
my $result = $self->{rcpt_sth}->execute($recipient->user, $recipient->host);
|
||||
if (!$result)
|
||||
{
|
||||
$self->log(LOGERROR, "Failed to fetch recipient information from the database"); return -1;
|
||||
}
|
||||
|
||||
if ($self->{rcpt_sth}->rows == 1)
|
||||
{
|
||||
$self->log(LOGDEBUG, " found recipient in database");
|
||||
|
||||
return 1;
|
||||
}
|
||||
elsif ($self->{rcpt_sth}->rows > 1)
|
||||
{
|
||||
$self->log(LOGERROR,"found multiple users for same recipient in database. Something wrong with database? (" . $recipient->user . '@' . $recipient->host . ")" );
|
||||
return -2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
sub hook_quit {
|
||||
my ($self, $transaction) = @_;
|
||||
|
||||
@ -126,42 +173,24 @@ sub hook_rcpt {
|
||||
|
||||
return DECLINED unless $recipient->host && $recipient->user;
|
||||
|
||||
$self->connect();
|
||||
$self->createStatements();
|
||||
|
||||
my $result = $self->{rcpt_sth}->execute($recipient->user, $recipient->host);
|
||||
if (!$result)
|
||||
|
||||
my $rcptValid = $self->CheckRecipient($recipient);
|
||||
|
||||
if ($rcptValid == 1 )
|
||||
{
|
||||
$self->log(LOGERROR, "Failed to fetch recipient information from the database"); return DECLINED;
|
||||
}
|
||||
|
||||
|
||||
if ($self->{rcpt_sth}->rows == 1)
|
||||
{
|
||||
$self->log(LOGDEBUG, " found recipient in database");
|
||||
my $ret = $self->updateTransactionWithRecipientInfo($transaction, $self->{rcpt_sth}->fetchrow_hashref );
|
||||
|
||||
if (!$ret)
|
||||
{
|
||||
$self->log(LOGERROR, "Failed to update transaction with recipient information");
|
||||
return DENYSOFT;
|
||||
}
|
||||
|
||||
$self->updateTransactionWithLmtpInfo($transaction, $recipient) || return DENYSOFT, "Temporary failure, try again later";
|
||||
return OK;
|
||||
}
|
||||
elsif ($self->{rcpt_sth}->rows > 1)
|
||||
elseif( $self->is_immune())
|
||||
{
|
||||
$self->log(LOGERROR,"found multiple users for same recipient in database. Something wrong with database? (" . $recipient->user . '@' . $recipient->host . ")" );
|
||||
return DENYSOFT;
|
||||
}
|
||||
|
||||
|
||||
$result = $self->{fetch_all_sth}->execute($recipient->host);
|
||||
if ($self->{fetch_all_sth}->rows > 0)
|
||||
{
|
||||
$self->log(LOGDEBUG, " found fetchall for doamin in database");
|
||||
$self->updateTransactionWithSmtpInfo($transaction, $recipient) || return DENYSOFT, "Temporary failure, try again later";
|
||||
return OK;
|
||||
}
|
||||
elseif ($rcptValid == -1)
|
||||
{
|
||||
return DENYSOFT, "Temporary failure, try again later";
|
||||
}
|
||||
|
||||
return DECLINED;
|
||||
|
||||
return Qpsmtpd::DSN->relaying_denied();
|
||||
}
|
||||
|
110
queue/lmtp
110
queue/lmtp
@ -4,92 +4,66 @@ use warnings;
|
||||
use Qpsmtpd::Constants;
|
||||
use Qpsmtpd::DSN;
|
||||
use Net::LMTP;
|
||||
|
||||
use Minion;
|
||||
|
||||
sub register {
|
||||
my ($self, $qp) = (shift, shift);
|
||||
|
||||
$self->{lmtp_rcpt_based} = $qp->config("lmtp_rcpt_based") || 0;
|
||||
$self->{lmtp_host} = $qp->config("lmtp_host");
|
||||
$self->{lmtp_port} = $qp->config("lmtp_port") || 24;
|
||||
$self->{qp}= $qp;
|
||||
$self->{enabled} = 1;
|
||||
# some default values
|
||||
$self->{database} = $qp->config("queue_mysql_database") || "mail";
|
||||
$self->{host} = $qp->config("queue_mysql_host") || "localhost";
|
||||
$self->{port} = $qp->config("queue_mysql_port") || "3306";
|
||||
$self->{user} = $qp->config("queue_mysql_user") || "qpsmtpd";
|
||||
$self->{pass} = $qp->config("queue_mysql_password");
|
||||
|
||||
|
||||
if (!$self->{lmtp_rcpt_based} && !$self->{lmtp_host}) {
|
||||
$self->{enabled} = 0;
|
||||
$self->log(LOGERROR, "No LMTP host configured, disabling plugin\n");
|
||||
return;
|
||||
}
|
||||
$self->createDSN();
|
||||
|
||||
}
|
||||
|
||||
sub lmtp_transfer
|
||||
sub createDSN()
|
||||
{
|
||||
my ($self, $transaction, $lmtp_host, $lmtp_port, $lmtp_user) = @_;
|
||||
my $self = shift;
|
||||
my $dsn = "mysql://" . $self->{user} . ":" . $self->{pass} . "@" . $self->{host} . ":" . $self->{port} . "/" . $self->{database};
|
||||
|
||||
my $lmtp = Net::LMTP->new(
|
||||
$lmtp_host,
|
||||
Port => $lmtp_port,
|
||||
Timeout => 60,
|
||||
Hello => $self->qp->config("me"),
|
||||
) || die $!;
|
||||
|
||||
|
||||
$lmtp->mail($transaction->sender->address || "")
|
||||
or return DECLINED, "Unable to queue message during mail($!)";
|
||||
$lmtp->to($lmtp_user) or return DECLINED, "Unable to queue message during to($!)";
|
||||
$lmtp->data() or return DECLINED, "Unable to queue message during data($!)";
|
||||
$lmtp->datasend($transaction->header->as_string) or return DECLINED, "Unable to queue message during datasend ($!) ";
|
||||
$transaction->body_resetpos;
|
||||
while (my $line = $transaction->body_getline) {
|
||||
$lmtp->datasend($line)
|
||||
or return DECLINED, "Unable to queue message during datasendbody($!)";
|
||||
}
|
||||
$lmtp->dataend() or return DECLINED, "Unable to queue message during dataend($!)";
|
||||
my $qid = $lmtp->message();
|
||||
my @list = split(' ', $qid);
|
||||
$qid = pop(@list);
|
||||
|
||||
$lmtp->quit() or return DECLINED, "Unable to queue message during quit($!)";
|
||||
|
||||
$self->log(LOGINFO, "finished queueing");
|
||||
return OK, "queued as $qid";
|
||||
|
||||
return DECLINED;
|
||||
$self->{dsn} = $dsn;
|
||||
}
|
||||
|
||||
sub hook_connect
|
||||
{
|
||||
my ($self, $transaction) = @_;
|
||||
|
||||
$self->{minion} = Minion->new(mysql => $self->{dsn});
|
||||
|
||||
}
|
||||
|
||||
|
||||
sub hook_queue {
|
||||
my ($self, $transaction) = @_;
|
||||
|
||||
return DECLINED unless $self->{enabled};
|
||||
my @notQueued;
|
||||
|
||||
my $lmtp_host;
|
||||
my $lmtp_port;
|
||||
my $lmtp_user;
|
||||
my $queue = $transaction->notes("queue") || {};
|
||||
|
||||
if ($self->{lmtp_rcpt_based})
|
||||
for my $rcpt (keys %{$queue})
|
||||
{
|
||||
my $queue = $transaction->notes("queue");
|
||||
$queue =~ /^lmtp:\/\/(.*):(\d+)$/;
|
||||
$lmtp_host=$1;
|
||||
$lmtp_port=$2;
|
||||
$lmtp_user = $transaction->notes("destination-user") || "";
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
$lmtp_host = $self->{lmtp_host};
|
||||
$lmtp_port = $self->{lmtp_port};
|
||||
$lmtp_user = $transaction->sender->address || "";
|
||||
}
|
||||
|
||||
if (!$lmtp_user) {
|
||||
$self->log(LOGERROR, "No sender address found for transaction.\n");
|
||||
return DECLINED;
|
||||
my $ret = $self->queue($transaction, $queue->{$rcpt});
|
||||
if (!$ret)
|
||||
{
|
||||
push(@notQueued,$rcpt);
|
||||
}
|
||||
}
|
||||
|
||||
$self->log(LOGNOTICE,"forwarding mail to LMTP host: $lmtp_host:$lmtp_port\n");
|
||||
|
||||
return $self->lmtp_transfer($transaction, $lmtp_host, $lmtp_port, $lmtp_user);
|
||||
return OK;
|
||||
}
|
||||
|
||||
sub queue
|
||||
{
|
||||
my ($self, $transaction, $rcpt) = @_;
|
||||
|
||||
$minion->enqueue(transmit => [$rcpt] => {
|
||||
attempts => 10,
|
||||
expire => 60*60*24*2,
|
||||
queue => $rcpt->{destination}
|
||||
});
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user