From 7ca06e4dc6229efed4a7eda679e9af64dff1cf58 Mon Sep 17 00:00:00 2001 From: Dominik Meyer Date: Sat, 8 Feb 2025 19:02:24 +0100 Subject: [PATCH] feat: started switching to real queueing --- federationhq_rcpt | 127 ++++++++++++++++++++++++++++------------------ queue/lmtp | 110 +++++++++++++++------------------------ 2 files changed, 120 insertions(+), 117 deletions(-) diff --git a/federationhq_rcpt b/federationhq_rcpt index 5146f8b..6d859d6 100644 --- a/federationhq_rcpt +++ b/federationhq_rcpt @@ -29,7 +29,7 @@ sub createStatements $self->{fetch_all_sth} = $self->{dbh}->prepare("select username,dovecot_server from email_address where domain=? and isFetchAll=1"); - $self->{fetch_dovecot_details_sth} = $self->{dbh}->prepare("select hostname, port from dovecot_server where name=?"); + $self->{fetch_dovecot_details_sth} = $self->{dbh}->prepare("select username, hostname, port from dovecot_server RIGHT JOIN email_address ON dovecot_server.name = email_address.dovecot_server where alias=? and domain=?"); } @@ -73,46 +73,93 @@ sub createDSN $self->log(LOGDEBUG, "created DSN " . $self->{dsn}); } -sub updateTransactionWithRecipientInfo +sub updateTransactionWithSmtpInfo { - my ($self, $transaction, $rcpt_row) = @_; + my ($self, $transaction, $recipient) = @_; - return 0 unless defined($rcpt_row); - return 0 unless defined($rcpt_row->{username}); - return 0 unless defined($rcpt_row->{dovecot_server}); + my $queue = $transaction->notes("queue") || {}; + my $rcpt = $recipient->user . "@" . $recipient->host; + + $queue->{$rcpt}->{destination} = "relay"; + $queue->{$rcpt}->{protocol} = "smtp"; + $queue->{$rcpt}->{host} = $self->qp->config("relay_server"); + + $transaction->notes("queue", $queue); + return 0; +} - my $dovecot_server = $rcpt_row->{dovecot_server}; - my $username = $rcpt_row->{username}; +sub updateTransactionWithLmtpInfo +{ + my ($self, $transaction, $recipient) = @_; - $self->log(LOGNOTICE,"transaction update " . $dovecot_server . "/" . $rcpt_row->{dovecot_server} . " " . $username); - - my $result = $self->{fetch_dovecot_details_sth}->execute($dovecot_server); + my $result = $self->{fetch_dovecot_details_sth}->execute($recipient->user, $recipient->host); if (!$result) { - $self->log(LOGERROR, "Failed to fetch dovecot server information for user " . $username . " and dovecot server " . $dovecot_server ); - return 0; + $self->log(LOGERROR, "Failed to fetch dovecot information from the database"); return -1; } if ($self->{fetch_dovecot_details_sth}->rows == 0) { - $self->log(LOGERROR, "no dovecot server information found for user " . $username . " and dovecot server " . $dovecot_server ); - return 0; + $self->log(LOGERROR, "no dovecot information in database found"); + return -1; + } + elsif ($self->{fetch_dovecot_details_sth}->rows > 1) + { + $self->log(LOGERROR, "too many dovecot entries in the database"); + return -1; } - my $row = $self->{fetch_dovecot_details_sth}->fetchrow_hashref(); + my $row = $self->{fetch_dovecot_details_sth}->fetchrow_hashref; + my $username = $row->{username} my $hostname = $row->{hostname}; my $port = $row->{port}; - $transaction->notes("queue", "lmtp://$hostname:$port"); - $transaction->notes("destination-user",$username); + my $queue = $transaction->notes("queue") || {}; + my $rcpt = $recipient->user . "@" . $recipient->host; + + $queue->{$rcpt}->{destination} = "local"; + $queue->{$rcpt}->{protocol} = "lmtp"; + $queue->{$rcpt}->{host} = $hostname; + $queue->{$rcpt}->{port} = $port; + $queue->{$rcpt}->{user} = $username; + + $transaction->notes("queue", $queue); $self->log(LOGNOTICE, "Setting LMTP server to dovecot on $hostname:$port for user: $username"); return 1; } +sub CheckRecipient +{ + my ($self, $recipient) = @_; + + $self->connect(); + $self->createStatements(); + + my $result = $self->{rcpt_sth}->execute($recipient->user, $recipient->host); + if (!$result) + { + $self->log(LOGERROR, "Failed to fetch recipient information from the database"); return -1; + } + + if ($self->{rcpt_sth}->rows == 1) + { + $self->log(LOGDEBUG, " found recipient in database"); + + return 1; + } + elsif ($self->{rcpt_sth}->rows > 1) + { + $self->log(LOGERROR,"found multiple users for same recipient in database. Something wrong with database? (" . $recipient->user . '@' . $recipient->host . ")" ); + return -2; + } + + return 0; +} + sub hook_quit { my ($self, $transaction) = @_; @@ -126,42 +173,24 @@ sub hook_rcpt { return DECLINED unless $recipient->host && $recipient->user; - $self->connect(); - $self->createStatements(); - - my $result = $self->{rcpt_sth}->execute($recipient->user, $recipient->host); - if (!$result) + + my $rcptValid = $self->CheckRecipient($recipient); + + if ($rcptValid == 1 ) { - $self->log(LOGERROR, "Failed to fetch recipient information from the database"); return DECLINED; - } - - - if ($self->{rcpt_sth}->rows == 1) - { - $self->log(LOGDEBUG, " found recipient in database"); - my $ret = $self->updateTransactionWithRecipientInfo($transaction, $self->{rcpt_sth}->fetchrow_hashref ); - - if (!$ret) - { - $self->log(LOGERROR, "Failed to update transaction with recipient information"); - return DENYSOFT; - } - + $self->updateTransactionWithLmtpInfo($transaction, $recipient) || return DENYSOFT, "Temporary failure, try again later"; return OK; } - elsif ($self->{rcpt_sth}->rows > 1) + elseif( $self->is_immune()) { - $self->log(LOGERROR,"found multiple users for same recipient in database. Something wrong with database? (" . $recipient->user . '@' . $recipient->host . ")" ); - return DENYSOFT; - } - - - $result = $self->{fetch_all_sth}->execute($recipient->host); - if ($self->{fetch_all_sth}->rows > 0) - { - $self->log(LOGDEBUG, " found fetchall for doamin in database"); + $self->updateTransactionWithSmtpInfo($transaction, $recipient) || return DENYSOFT, "Temporary failure, try again later"; return OK; } + elseif ($rcptValid == -1) + { + return DENYSOFT, "Temporary failure, try again later"; + } - return DECLINED; + + return Qpsmtpd::DSN->relaying_denied(); } diff --git a/queue/lmtp b/queue/lmtp index eb1cd56..c3e36ca 100644 --- a/queue/lmtp +++ b/queue/lmtp @@ -4,92 +4,66 @@ use warnings; use Qpsmtpd::Constants; use Qpsmtpd::DSN; use Net::LMTP; - +use Minion; sub register { my ($self, $qp) = (shift, shift); - $self->{lmtp_rcpt_based} = $qp->config("lmtp_rcpt_based") || 0; - $self->{lmtp_host} = $qp->config("lmtp_host"); - $self->{lmtp_port} = $qp->config("lmtp_port") || 24; - $self->{qp}= $qp; - $self->{enabled} = 1; + # some default values + $self->{database} = $qp->config("queue_mysql_database") || "mail"; + $self->{host} = $qp->config("queue_mysql_host") || "localhost"; + $self->{port} = $qp->config("queue_mysql_port") || "3306"; + $self->{user} = $qp->config("queue_mysql_user") || "qpsmtpd"; + $self->{pass} = $qp->config("queue_mysql_password"); - - if (!$self->{lmtp_rcpt_based} && !$self->{lmtp_host}) { - $self->{enabled} = 0; - $self->log(LOGERROR, "No LMTP host configured, disabling plugin\n"); - return; - } + $self->createDSN(); } -sub lmtp_transfer +sub createDSN() { - my ($self, $transaction, $lmtp_host, $lmtp_port, $lmtp_user) = @_; + my $self = shift; + my $dsn = "mysql://" . $self->{user} . ":" . $self->{pass} . "@" . $self->{host} . ":" . $self->{port} . "/" . $self->{database}; - my $lmtp = Net::LMTP->new( - $lmtp_host, - Port => $lmtp_port, - Timeout => 60, - Hello => $self->qp->config("me"), - ) || die $!; - - - $lmtp->mail($transaction->sender->address || "") - or return DECLINED, "Unable to queue message during mail($!)"; - $lmtp->to($lmtp_user) or return DECLINED, "Unable to queue message during to($!)"; - $lmtp->data() or return DECLINED, "Unable to queue message during data($!)"; - $lmtp->datasend($transaction->header->as_string) or return DECLINED, "Unable to queue message during datasend ($!) "; - $transaction->body_resetpos; - while (my $line = $transaction->body_getline) { - $lmtp->datasend($line) - or return DECLINED, "Unable to queue message during datasendbody($!)"; - } - $lmtp->dataend() or return DECLINED, "Unable to queue message during dataend($!)"; - my $qid = $lmtp->message(); - my @list = split(' ', $qid); - $qid = pop(@list); - - $lmtp->quit() or return DECLINED, "Unable to queue message during quit($!)"; - - $self->log(LOGINFO, "finished queueing"); - return OK, "queued as $qid"; - - return DECLINED; + $self->{dsn} = $dsn; } +sub hook_connect +{ + my ($self, $transaction) = @_; + + $self->{minion} = Minion->new(mysql => $self->{dsn}); + +} + + sub hook_queue { my ($self, $transaction) = @_; - return DECLINED unless $self->{enabled}; + my @notQueued; - my $lmtp_host; - my $lmtp_port; - my $lmtp_user; + my $queue = $transaction->notes("queue") || {}; - if ($self->{lmtp_rcpt_based}) + for my $rcpt (keys %{$queue}) { - my $queue = $transaction->notes("queue"); - $queue =~ /^lmtp:\/\/(.*):(\d+)$/; - $lmtp_host=$1; - $lmtp_port=$2; - $lmtp_user = $transaction->notes("destination-user") || ""; - - } - else - { - $lmtp_host = $self->{lmtp_host}; - $lmtp_port = $self->{lmtp_port}; - $lmtp_user = $transaction->sender->address || ""; - } - - if (!$lmtp_user) { - $self->log(LOGERROR, "No sender address found for transaction.\n"); - return DECLINED; + my $ret = $self->queue($transaction, $queue->{$rcpt}); + if (!$ret) + { + push(@notQueued,$rcpt); + } } - $self->log(LOGNOTICE,"forwarding mail to LMTP host: $lmtp_host:$lmtp_port\n"); - - return $self->lmtp_transfer($transaction, $lmtp_host, $lmtp_port, $lmtp_user); + return OK; +} + +sub queue +{ + my ($self, $transaction, $rcpt) = @_; + + $minion->enqueue(transmit => [$rcpt] => { + attempts => 10, + expire => 60*60*24*2, + queue => $rcpt->{destination} + }); + } \ No newline at end of file