#!/usr/bin/perl

use strict;
use warnings;

use Cwd;
use Data::Dumper;
use DBIx::Simple;
use File::stat;
use Time::TAI64 qw/ tai2unix /;

$Data::Dumper::Sortkeys = 1;

my $dsn  = 'DBI:mysql:database=qpsmtpd;host=db;port=3306';
my $user = 'qplog';
my $pass = 't0ps3cret';

my $logdir   = get_log_dir();
my @logfiles = get_logfiles($logdir);

my (%plugins, %os, %message_ids);
my $has_cleanup;
my $db = get_db();

foreach my $file ( @logfiles ) {
    my ($fid, $offset) = check_logfile($file);
    $fid or next;
    parse_logfile( $file, $fid, $offset );
};

exit;

sub trim_message {
    my $mess = shift;

    return '' if $mess eq 'skip, naughty';
    return '' if $mess eq 'skip, relay client';
    return '' if $mess eq 'skip, no match';
    return '' if $mess eq 'skip: unsigned';
    return '' if $mess eq 'skip, not a null sender';
    return '' if $mess eq 'pass';
    return '' if $mess eq 'pass, no record';
    return '' if $mess eq 'pass, Deliverable through vpopmail';
    return '' if $mess eq 'pass, clean';
    return '' if $mess =~ /^fail. NAUGHTY/;
    return '' if $mess =~ /^PTR:\s/;
    return '' if $mess eq 'TLS setup returning';

    return $mess;
};

sub get_os_id {
    my $p0f_string = shift or return;

    $p0f_string =~ s/\s+$//;
    $p0f_string =~ s/^\s+//;
    return if ! $p0f_string;
    return if $p0f_string =~ /no match/;
    return if $p0f_string =~ /^skip/;
    return if $p0f_string =~ /^\d/;
    return if $p0f_string =~ /^\(/;
    return if $p0f_string !~ /\w/;
    return if $p0f_string =~ /no longer in the cache/;

    if ( ! scalar keys %os ) {
        my $ref = exec_query( 'SELECT * FROM os' );
        foreach my $o ( @$ref ) {
            $os{ $o->{name} } = $o->{id};
        };
    };

    if ( ! defined $os{$p0f_string} ) {
        warn "missing OS for $p0f_string\n";
    };

    return $os{$p0f_string};
};

sub get_plugin_id {
    my $plugin = shift;

    if ( ! scalar keys %plugins ) {
        my $ref = exec_query( 'SELECT * FROM plugin' );
        foreach my $p ( @$ref ) {
            $plugins{ $p->{name} } = $p->{id};
            $plugins{ $p->{id} } = $p->{name};
        };
        $ref = exec_query( 'SELECT * FROM plugin_aliases' );
        foreach my $pa ( @$ref ) {
            $plugins{ $pa->{name} } = $pa->{plugin_id};
        };
    };

    if ( ! defined $plugins{$plugin} ) {
        #warn Dumper(\%plugins);
        die "missing DB plugin $plugin\n";
    };

    return $plugins{$plugin};
};

sub get_msg_id {
    my ( $fid, $pid ) = @_;

    return $message_ids{ "$fid-$pid" } if $message_ids{ "$fid-$pid" };

    #print "searching for message $pid...";
    my $msgs = exec_query(
        'SELECT * FROM message WHERE file_id=? AND qp_pid=?',
        [ $fid, $pid ]
    );
    #print scalar @$msgs ? "y\n" : "n\n";
    if ( $msgs->[0]{id} ) {
        $message_ids{ "$fid-$pid" } = $msgs->[0]{id};
    };
    return $msgs->[0]{id};
};

sub create_message {
    my ( $fid, $ts, $pid, $message ) = @_;

    my ($host, $ip) = split /\s/, $message;
    $ip = substr $ip, 1, -1;   # remove brackets

    my $id = exec_query(
        "INSERT INTO message SET file_id=?, connect_start=FROM_UNIXTIME(?), qp_pid=?, ip=INET_ATON(?)",
        [ $fid, $ts, $pid, $ip ]
    );

    if ( $host && $host ne 'Unknown' ) {
        exec_query( "UPDATE message SET hostname=? WHERE id=?", [ $host, $id ] );
    };
    #warn "host updated: $host\n";
};

sub insert_plugin {
    my ( $msg_id, $plugin, $message ) = @_;

    my $plugin_id = get_plugin_id( $plugin );

    if ( $plugin eq 'ident::geoip' ) {
        my ($gip, $distance) = $message =~ /(.*?),\s+([\d]+)\skm/;
        if ( $distance ) {
            exec_query( 'UPDATE message SET distance=? WHERE id=?', [ $distance, $msg_id ] );
            $message = $gip;
        }
    }
    elsif ( $plugin =~ /^ident::p0f/ ) {
        my $os_id = get_os_id( $message );
        if ( $os_id ) {
            exec_query( 'UPDATE message SET os_id=? WHERE id=?', [ $os_id, $msg_id ] );
            $message = 'pass';
        }
    }
    elsif ( $plugin eq 'connection_time' ) {
        my ($seconds) = $message =~ /\s*([\d\.]+)\s/;
        if ( $seconds ) {
            exec_query( 'UPDATE message SET time=? WHERE id=?', [ $seconds, $msg_id ] );
            $message = 'pass';
        }
    }

    my $result = get_score( $message );
    if ( $result ) {
        $message = trim_message($message);
    };

    exec_query( 'INSERT INTO message_plugin SET msg_id=?, plugin_id=?, result=?, string=?',
        [ $msg_id, $plugin_id, $result, $message ]
    );
};

sub parse_logfile {
    my $file = shift;
    my $fid = shift;
    my $offset = shift || 0;
    my $path = "$logdir/$file";

    print "parsing file $file (id: $fid) from offset $offset\n";
    open my $F, '<', $path or die "could not open $path: $!";
    seek( $F, $offset, 0 ) if $offset;

    while ( defined (my $line = <$F> ) ) {
        chomp $line;
        next if ! $line;
        my ( $type, $pid, $hook, $plugin, $message ) = parse_line( $line );

        next if ! $type;
        next if $type eq 'info';
        next if $type eq 'unknown';
        next if $type eq 'response';
        next if $type eq 'init';        # doesn't occur in all deployment models
        next if $type eq 'cleanup';
        next if $type eq 'error';

        my $ts = tai2unix( (split /\s/, $line)[0] ); # print "ts: $ts\n";

        my $msg_id = get_msg_id( $fid, $pid ) or do {
            create_message( $fid, $ts, $pid, $message ) if $type eq 'connect';
            next;
        };

        #warn "type: $type\n";
        if ( $type eq 'plugin' ) {
            next if $plugin eq 'naughty';    # housekeeping only
            insert_plugin( $msg_id, $plugin, $message );
        }
        elsif ( $type eq 'queue' ) {
            exec_query('UPDATE message SET result=? WHERE id=?', [ 3, $msg_id ] );
        }
        elsif ( $type eq 'reject' ) {
            exec_query('UPDATE message SET result=? WHERE id=?', [ -3, $msg_id ] );
        }
        elsif ( $type eq 'close' ) {
            if ( $message eq 'Connection Timed Out' ) {
                exec_query('UPDATE message SET result=? WHERE id=?', [ -1, $msg_id ] );
            };
        }
        elsif ( $type eq 'connect' ) { }
        elsif ( $type eq 'dispatch' ) {
            if ( substr($message, 0, 21) eq 'dispatching MAIL FROM' ) {
                my ($from) = $message =~  /<(.*?)>/;
                exec_query('UPDATE message SET mail_from=? WHERE id=?', [ $from, $msg_id ] );
            }
            elsif ( substr($message, 0, 19) eq 'dispatching RCPT TO' ) {
                my ($to) = $message =~  /<(.*?)>/;
                exec_query('UPDATE message SET rcpt_to=? WHERE id=? AND rcpt_to IS NULL', [ $to, $msg_id ] );
            }
            elsif ( $message =~ m/dispatching (EHLO|HELO) (.*)/ ) {
                exec_query('UPDATE message SET helo=? WHERE id=?', [ $2, $msg_id ] );
            }
            elsif ( $message eq 'dispatching DATA' ) { }
            elsif ( $message eq 'dispatching QUIT' ) { }
            elsif ( $message eq 'dispatching STARTTLS' ) { }
            elsif ( $message eq 'dispatching RSET' ) { }
            else {
                # anything here is likely an unrecognized command
                #print "$message\n";
            };
        }
        else {
            print "$type $pid $hook $plugin $message\n";
        };
    };

    close $F;
};

sub check_logfile {
    my $file = shift;
    my $path = "$logdir/$file";

    die "missing file $logdir/$file" if ! -f "$logdir/$file";

    my $inode = stat($path)->ino or die "unable to get inode for $path\n";
    my $size  = stat($path)->size or die "unable to get size for $path\n";
    my $exists;

    #warn "check if file $file is in the DB as 'current'\n";
    if ( $file =~ /^\@/ ) {
        $exists = exec_query(
            'SELECT * FROM log WHERE inode=? AND name=?',
            [ $inode, 'current' ]
        );
        if ( @$exists ) {
            print "Updating current -> $file\n";
            exec_query(
                'UPDATE log SET name=? WHERE inode=? AND name=?',
                [ $file, $inode, 'current' ]
            );
            return ( $exists->[0]{id}, $exists->[0]{size} );      # continue parsing
        };
    };

    if ( $file eq 'current' ) {
        $exists = exec_query(
            'SELECT * FROM log WHERE inode=? AND name=?',
            [ $inode, $file ]
        );
        if ( @$exists ) {
            exec_query(
                'UPDATE log SET size=? WHERE inode=? AND name=?',
                [ $size, $inode, 'current' ]
            );
            return ( $exists->[0]{id}, $exists->[0]{size} );      # continue parsing
        };
    };

    $exists = exec_query(
        'SELECT * FROM log WHERE name=? AND size=?',
        [ $file, $size ]
    );
    return if @$exists;    # log file hasn't changed, ignore it
    #print Dumper($exists);

    # file is a new one we haven't seen, add to DB and parse
    my $id = exec_query(
        'INSERT INTO log SET inode=?, size=?, name=?, created=FROM_UNIXTIME(?)',
        [ $inode, $size, $file, stat($path)->ctime ]
    );
    print "new file id: $id\n";
    return ( $id );
};

sub get_log_dir {

    if ( -d "log/main" ) {
        my $wd = Cwd::cwd();
        return "$wd/log/main";
    };

    foreach my $user ( qw/ qpsmtpd smtpd / ) {

        my ($homedir) = (getpwnam( $user ))[7] or next;

        if ( -d "$homedir/log" ) {
            return "$homedir/log/main";
        };
        if ( -d "$homedir/smtpd/log" ) {
            return "$homedir/smtpd/log/main";
        };
    };

};

sub get_logfiles {
    my $dir = shift;

    opendir my $D, $dir or die "unable to open log dir $dir\n";

    my @files;
    while ( defined( my $f = readdir($D) ) ) {
        next if ! -f "$dir/$f";   # ignore anything that's not a file
        if ( $f =~ /^\@.*s$/ ) {
            push @files, $f;
        };
    }
    push @files, "current";  # always have this one last

    closedir $D;
    return @files;
};

sub parse_line {
    my $line = shift;
    my ($tai, $pid, $message) = split /\s+/, $line, 3;
    return if ! $message;  # garbage in the log file

    # lines seen many times per connection
    return parse_line_plugin( $line ) if substr($message, 0, 1) eq '(';
    return ( 'dispatch', $pid, undef, undef, $message ) if substr($message, 0, 12) eq 'dispatching ';
    return ( 'queue',    $pid, undef, undef, $message ) if substr($message, 0, 11) eq '250 Queued!';
    return ( 'response', $pid, undef, undef, $message ) if $message =~ /^[2|3]\d\d/;

    # lines seen about once per connection
    return ( 'init',     $pid, undef, undef, $message ) if substr($message, 0, 19) eq 'Accepted connection';
    return ( 'connect',  $pid, undef, undef, substr( $message, 16) ) if substr($message, 0, 15) eq 'Connection from';
    return ( 'connect',  $pid, undef, undef, substr( $message, 16) ) if substr($message, 0, 8) eq 'connect ';
    return ( 'close',    $pid, undef, undef, $message ) if substr($message, 0, 6) eq 'close ';
    return ( 'close',    $pid, undef, undef, $message ) if $message eq 'Connection Timed Out';
    return ( 'close',    $pid, undef, undef, $message ) if substr($message, 0, 20) eq 'click, disconnecting';
    return parse_line_cleanup( $line ) if substr($message, 0, 11) eq 'cleaning up';

    # lines seen less than once per connection
    return ( 'info',     $pid, undef, undef, $message ) if $message eq 'spooling message to disk';
    return ( 'reject',   $pid, undef, undef, $message ) if $message =~ /^[4|5]\d\d/;
    return ( 'reject',   $pid, undef, undef, $message ) if substr($message, 0, 14) eq 'deny mail from';
    return ( 'reject',   $pid, undef, undef, $message ) if substr($message, 0, 18) eq 'denysoft mail from';
    return ( 'info',     $pid, undef, undef, $message ) if substr($message, 0, 15) eq 'Lost connection';
    return ( 'info',     $pid, undef, undef, $message ) if $message eq 'auth success cleared naughty';
    return ( 'info',     $pid, undef, undef, $message ) if substr($message, 0, 15) eq 'Running as user';
    return ( 'info',     $pid, undef, undef, $message ) if substr($message, 0, 16) eq 'Loaded Qpsmtpd::';
    return ( 'info',     $pid, undef, undef, $message ) if substr($message, 0, 24) eq 'Permissions on spool_dir';
    return ( 'info',     $pid, undef, undef, $message ) if substr($message, 0, 13) eq 'Listening on ';
    return ( 'info',     $pid, undef, undef, $message ) if substr($message, 0, 18) eq 'size_threshold set';
    return ( 'info',     $pid, undef, undef, $message ) if substr($message, 0, 12) eq 'tls: ciphers';
    return ( 'error',    $pid, undef, undef, $message ) if substr($message, 0, 22) eq 'of uninitialized value';
    return ( 'error',    $pid, undef, undef, $message ) if substr($message, 0,  8) eq 'symbol "';
    return ( 'error',    $pid, undef, undef, $message ) if substr($message, 0,  9) eq 'error at ';
    return ( 'error',    $pid, undef, undef, $message ) if substr($message, 0, 15) eq 'Could not print';

    print "UNKNOWN LINE: $line\n";
    return ( 'unknown',  $pid, undef, undef, $message );
};

sub parse_line_plugin {
    my ($line) = @_;

    # @tai 13486 (connect) ident::p0f: Windows (XP/2000 (RFC1323+, w, tstamp-))
    # @tai 13681 (connect) dnsbl: fail, NAUGHTY
    # @tai 15787 (connect) karma: pass, no penalty (0 naughty, 3 nice, 3 connects)
    # @tai 27500 (queue) queue::qmail_2dqueue: (for 27481) Queuing to /var/qmail/bin/qmail-queue
    my ($tai, $pid, $hook, $plugin, $message ) = split /\s/, $line, 5;
    $plugin =~ s/:$//;

    return parse_line_plugin_p0f( $line ) if $plugin =~ /^ident::p0f/;
    return parse_line_plugin_dspam( $line ) if $plugin =~ /^dspam/;
    return parse_line_plugin_spamassassin( $line ) if $plugin =~ /^spamassassin/;

    if ( $plugin eq 'sender_permitted_from' ) {
        $message = 'pass' if $message =~ /^pass/;
        $message = 'fail' if $message =~ /^fail/;
        $message = 'skip' if $message =~ /^none/;
    }
    elsif ( $plugin eq 'queue::qmail_2dqueue' ) {
        ($pid) = $message =~ /\(for ([\d]+)\)/;
        $message = 'pass' if $message =~ /Queuing/;
    }
    elsif ( $plugin =~ /(?:early|karma|helo|rcpt_ok)/ ) {
        $message = 'pass' if $message =~ /^pass/;
    }
    elsif ( $plugin =~ /resolvable_fromhost/ ) {
        $message = 'pass' if $message =~ /^pass/;
    };

    return ( 'plugin', $pid, $hook, $plugin, $message );
};

sub parse_line_plugin_dspam {
    my $line = shift;

    my ($tai, $pid, $hook, $plugin, $message ) = split /\s/, $line, 5;
    $plugin =~ s/:$//;

    if ( $message =~ /Innocent, (\d\.\d\d c)/ ) {
        $message = "pass, $1";
    };
    if ( $message =~ /Spam, (\d\.\d\d c)/ ) {
        $message = "fail, $1";
    };

    return ( 'plugin', $pid, $hook, $plugin, $message );
};

sub parse_line_plugin_spamassassin {
    my $line = shift;

    my ($tai, $pid, $hook, $plugin, $message ) = split /\s/, $line, 5;
    $plugin =~ s/:$//;

    if ( $message =~ /pass, Ham, ([\d\-\.]+)\s/ ) {
        $message = "pass, $1";
    };
    if ( $message =~ /^fail, Spam,\s([\d\.]+)\s< 100/ ) {
        $message = "fail, $1";
    };

    return ( 'plugin', $pid, $hook, $plugin, $message );
};

sub parse_line_plugin_p0f {
    my $line = shift;

    my ($tai, $pid, $hook, $plugin, $message ) = split /\s/, $line, 5;
    $plugin =~ s/:$//;

    if ( substr( $message, -5, 5) eq 'hops)' ) {
        ($message) = split( /\s\(/, $message );
    };

    $message = 'iOS'            if $message =~ /^iOS/;
    $message = 'Solaris'        if $message =~ /^Solaris/;
    $message = 'Mac OS X'       if $message =~ /^Mac OS X/;
    $message = 'FreeBSD'        if $message =~ /^FreeBSD/;
    $message = 'Linux'          if $message =~ /^Linux/;
    $message = 'OpenBSD'        if $message =~ /^OpenBSD/;
    $message = 'Windows NT'     if $message =~ /^Windows \(?NT/;
    $message = 'Windows 95'     if $message =~ /^Windows \(?95/;
    $message = 'Windows 98'     if $message =~ /^Windows \(?98/;
    $message = 'Windows XP'     if $message =~ /^Windows \(?XP/;
    $message = 'Windows 2000'   if $message =~ /^Windows \(?2000/;
    $message = 'Windows 2003'   if $message =~ /^Windows \(?2003/;
    $message = 'Windows 7 or 8' if $message =~ /^Windows 7/;
    $message = 'Windows 7 or 8' if $message =~ /^Windows 8/;
    $message = 'Google'         if $message =~ /^Google/;
    $message = 'HP-UX'          if $message =~ /^HP\-UX/;
    $message = 'NetCache'       if $message =~ /^NetCache/i;
    $message = 'Cisco'          if $message =~ /^Cisco/i;
    $message = 'Netware'        if $message =~ /Netware/i;

    return ( 'plugin', $pid, $hook, $plugin, $message );
};

sub parse_line_cleanup {
    my ($line) = @_;
    # @tai 85931 cleaning up after 3210
    my $pid = (split /\s+/, $line)[-1];
    $has_cleanup++;
    return ( 'cleanup', $pid, undef, undef, $line );
};

sub get_score {
    my $mess = shift;
    return 3   if $mess eq 'TLS setup returning';
    return 3   if $mess =~ /^pass/;
    return -3  if $mess =~ /^fail/;
    return -2  if $mess =~ /^negative/;
    return 2   if $mess =~ /^positive/;
    return 1   if $mess =~ /^skip/;
    return 0;
};


sub get_db {

    my $db = DBIx::Simple->connect( $dsn, $user, $pass )
        or die DBIx::Simple->error;

    return $db;
};

sub exec_query {
    my $query = shift;
    my $params = shift;
    die "invalid arguments to exec_query!" if @_;
    my @params;
    if ( defined $params ) {
        @params = ref $params eq 'ARRAY' ? @$params : $params;
    };

    my $err = "query failed: $query\n";
    if ( scalar @params ) {
        $err .= join(',', @params);
    };

    #warn "err: $err\n";
    if ( $query =~ /INSERT INTO/ ) {
        my ( $table ) = $query =~ /INSERT INTO (\w+)\s/;
        $db->query( $query, @params );
        die "$db->error\n$err" if $db->error ne 'DBI error: ';
        my $id = $db->last_insert_id(undef,undef,$table,undef) or die $err;
        return $id;
    }
    elsif ( $query =~ /^UPDATE/i ) {
        return $db->query( $query, @params );
    }
    elsif ( $query =~ /DELETE/ ) {
        $db->query( $query, @params ) or die $err;
        return $db->query("SELECT ROW_COUNT()")->list;
    };

    my $r = $db->query( $query, @params )->hashes or die $err;
    return $r;
};