#!/usr/bin/perl use strict; use warnings; use Cwd; use Data::Dumper; use DBIx::Simple; use IO::File; use File::stat; use Time::TAI64 qw/ tai2unix /; $Data::Dumper::Sortkeys = 1; my $logdir = get_log_dir(); my @logfiles = get_logfiles($logdir); my (%plugins, %os, %message_ids); my $has_cleanup; my $db = get_db(); check_plugins_table(); foreach my $file (@logfiles) { my ($fid, $offset) = check_logfile($file); $fid or next; parse_logfile($file, $fid, $offset); } exit; sub trim_message { my $mess = shift; return '' if $mess eq 'skip, naughty'; return '' if $mess eq 'skip, relay client'; return '' if $mess eq 'skip, no match'; return '' if $mess eq 'skip: unsigned'; return '' if $mess eq 'skip, not a null sender'; return '' if $mess eq 'pass'; return '' if $mess eq 'pass, no record'; return '' if $mess eq 'pass, Deliverable through vpopmail'; return '' if $mess eq 'pass, clean'; return '' if $mess =~ /^fail. NAUGHTY/; return '' if $mess =~ /^PTR:\s/; return '' if $mess eq 'TLS setup returning'; return $mess; } sub get_os_id { my $p0f_string = shift or return; $p0f_string =~ s/\s+$//; $p0f_string =~ s/^\s+//; return if !$p0f_string; return if $p0f_string =~ /no match/; return if $p0f_string =~ /^skip/; return if $p0f_string =~ /^\d/; return if $p0f_string =~ /^\(/; return if $p0f_string !~ /\w/; return if $p0f_string =~ /no longer in the cache/; if (!scalar keys %os) { my $ref = exec_query('SELECT * FROM os'); foreach my $o (@$ref) { $os{$o->{name}} = $o->{id}; } } if (!defined $os{$p0f_string}) { warn "missing OS for $p0f_string\n"; } return $os{$p0f_string}; } sub get_plugin_id { my $plugin = shift; if (!scalar keys %plugins) { my $ref = exec_query('SELECT * FROM plugin'); foreach my $p (@$ref) { $plugins{$p->{name}} = $p->{id}; $plugins{$p->{id}} = $p->{name}; } $ref = exec_query('SELECT * FROM plugin_aliases'); foreach my $pa (@$ref) { $plugins{$pa->{name}} = $pa->{plugin_id}; } } if (!defined $plugins{$plugin}) { #warn Dumper(\%plugins); die "missing DB plugin $plugin\n"; } return $plugins{$plugin}; } sub get_msg_id { my ($fid, $pid) = @_; return $message_ids{"$fid-$pid"} if $message_ids{"$fid-$pid"}; #print "searching for message $pid..."; my $msgs = exec_query('SELECT * FROM message WHERE file_id=? AND qp_pid=?', [$fid, $pid]); #print scalar @$msgs ? "y\n" : "n\n"; if ($msgs->[0]{id}) { $message_ids{"$fid-$pid"} = $msgs->[0]{id}; } return $msgs->[0]{id}; } sub create_message { my ($fid, $ts, $pid, $message) = @_; my ($host, $ip) = split /\s/, $message; $ip = substr $ip, 1, -1; # remove brackets my $id = exec_query( "INSERT INTO message SET file_id=?, connect_start=FROM_UNIXTIME(?), qp_pid=?, ip=INET_ATON(?)", [$fid, $ts, $pid, $ip] ); if ($host && $host ne 'Unknown') { exec_query("UPDATE message SET hostname=? WHERE id=?", [$host, $id]); } #warn "host updated: $host\n"; } sub insert_plugin { my ($msg_id, $plugin, $message) = @_; my $plugin_id = get_plugin_id($plugin); if ($plugin eq 'ident::geoip') { my ($gip, $distance) = $message =~ /(.*?),\s+([\d]+)\skm/; if ($distance) { exec_query('UPDATE message SET distance=? WHERE id=?', [$distance, $msg_id]); $message = $gip; } } elsif ($plugin =~ /^ident::p0f/) { my $os_id = get_os_id($message); if ($os_id) { exec_query('UPDATE message SET os_id=? WHERE id=?', [$os_id, $msg_id]); $message = 'pass'; } } elsif ($plugin eq 'connection_time') { my ($seconds) = $message =~ /\s*([\d\.]+)\s/; if ($seconds) { exec_query('UPDATE message SET time=? WHERE id=?', [$seconds, $msg_id]); $message = 'pass'; } } my $result = get_score($message); if ($result) { $message = trim_message($message); } exec_query( 'INSERT INTO message_plugin SET msg_id=?, plugin_id=?, result=?, string=?', [$msg_id, $plugin_id, $result, $message] ); } sub parse_logfile { my $file = shift; my $fid = shift; my $offset = shift || 0; my $path = "$logdir/$file"; print "parsing file $file (id: $fid) from offset $offset\n"; open my $F, '<', $path or die "could not open $path: $!"; seek($F, $offset, 0) if $offset; while (defined(my $line = <$F>)) { chomp $line; next if !$line; my ($type, $pid, $hook, $plugin, $message) = parse_line($line); next if !$type; next if $type eq 'info'; next if $type eq 'unknown'; next if $type eq 'response'; next if $type eq 'init'; # doesn't occur in all deployment models next if $type eq 'cleanup'; next if $type eq 'error'; my $ts = tai2unix((split /\s/, $line)[0]); # print "ts: $ts\n"; my $msg_id = get_msg_id($fid, $pid) or do { create_message($fid, $ts, $pid, $message) if $type eq 'connect'; next; }; #warn "type: $type\n"; if ($type eq 'plugin') { next if $plugin eq 'naughty'; # housekeeping only next if $plugin eq 'karma' && 'karma adjust' eq substr($message,0,12); insert_plugin($msg_id, $plugin, $message); } elsif ($type eq 'queue') { exec_query('UPDATE message SET result=? WHERE id=?', [3, $msg_id]); } elsif ($type eq 'reject') { exec_query('UPDATE message SET result=? WHERE id=?', [-3, $msg_id]); } elsif ($type eq 'close') { if ($message eq 'Connection Timed Out') { exec_query('UPDATE message SET result=? WHERE id=?', [-1, $msg_id]); } } elsif ($type eq 'connect') { } elsif ($type eq 'dispatch') { if (substr($message, 0, 21) eq 'dispatching MAIL FROM') { my ($from) = $message =~ /<(.*?)>/; exec_query('UPDATE message SET mail_from=? WHERE id=?', [$from, $msg_id]); } elsif (substr($message, 0, 19) eq 'dispatching RCPT TO') { my ($to) = $message =~ /<(.*?)>/; exec_query( 'UPDATE message SET rcpt_to=? WHERE id=? AND rcpt_to IS NULL', [$to, $msg_id] ); } elsif ($message =~ m/dispatching (EHLO|HELO) (.*)/) { exec_query('UPDATE message SET helo=? WHERE id=?', [$2, $msg_id]); } elsif ($message eq 'dispatching DATA') { } elsif ($message eq 'dispatching QUIT') { } elsif ($message eq 'dispatching STARTTLS') { } elsif ($message eq 'dispatching RSET') { } else { # anything here is likely an unrecognized command #print "$message\n"; } } else { print "$type $pid $hook $plugin $message\n"; } } close $F; } sub check_logfile { my $file = shift; my $path = "$logdir/$file"; die "missing file $logdir/$file" if !-f "$logdir/$file"; my $inode = stat($path)->ino or die "unable to get inode for $path\n"; my $size = stat($path)->size or die "unable to get size for $path\n"; my $exists; #warn "check if file $file is in the DB as 'current'\n"; if ($file =~ /^\@/) { $exists = exec_query('SELECT * FROM log WHERE inode=? AND name=?', [$inode, 'current']); if (@$exists) { print "Updating current -> $file\n"; exec_query('UPDATE log SET name=? WHERE inode=? AND name=?', [$file, $inode, 'current']); return $exists->[0]{id}, $exists->[0]{size}; # continue parsing } } if ($file eq 'current') { $exists = exec_query('SELECT * FROM log WHERE inode=? AND name=?', [$inode, $file]); if (@$exists) { exec_query('UPDATE log SET size=? WHERE inode=? AND name=?', [$size, $inode, 'current']); return $exists->[0]{id}, $exists->[0]{size}; # continue parsing } } $exists = exec_query('SELECT * FROM log WHERE name=? AND size=?', [$file, $size]); return if @$exists; # log file hasn't changed, ignore it #print Dumper($exists); # file is a new one we haven't seen, add to DB and parse my $id = exec_query( 'INSERT INTO log SET inode=?, size=?, name=?, created=FROM_UNIXTIME(?)', [$inode, $size, $file, stat($path)->ctime] ); print "new file id: $id\n"; return $id; } sub get_log_dir { if (-d "log/main") { my $wd = Cwd::cwd(); return "$wd/log/main"; } foreach my $user (qw/ qpsmtpd smtpd /) { my ($homedir) = (getpwnam($user))[7] or next; if (-d "$homedir/log") { return "$homedir/log/main"; } if (-d "$homedir/smtpd/log") { return "$homedir/smtpd/log/main"; } } } sub get_logfiles { my $dir = shift; opendir my $D, $dir or die "unable to open log dir $dir\n"; my @files; while (defined(my $f = readdir($D))) { next if !-f "$dir/$f"; # ignore anything that's not a file if ($f =~ /^\@.*s$/) { push @files, $f; } } push @files, "current"; # always have this one last closedir $D; return @files; } sub parse_line { my $line = shift; my ($tai, $pid, $message) = split /\s+/, $line, 3; return if !$message; # garbage in the log file # lines seen many times per connection return parse_line_plugin($line) if substr($message, 0, 1) eq '('; return 'dispatch', $pid, undef, undef, $message if substr($message, 0, 12) eq 'dispatching '; return 'queue', $pid, undef, undef, $message if substr($message, 0, 11) eq '250 Queued!'; return 'response', $pid, undef, undef, $message if $message =~ /^[2|3]\d\d/; # lines seen about once per connection return 'init', $pid, undef, undef, $message if substr($message, 0, 19) eq 'Accepted connection'; return 'connect', $pid, undef, undef, substr($message, 16) if substr($message, 0, 15) eq 'Connection from'; return 'connect', $pid, undef, undef, substr($message, 16) if substr($message, 0, 8) eq 'connect '; return 'close', $pid, undef, undef, $message if substr($message, 0, 6) eq 'close '; return 'close', $pid, undef, undef, $message if $message eq 'Connection Timed Out'; return 'close', $pid, undef, undef, $message if substr($message, 0, 20) eq 'click, disconnecting'; return parse_line_cleanup($line) if substr($message, 0, 11) eq 'cleaning up'; # lines seen less than once per connection return 'info', $pid, undef, undef, $message if $message eq 'spooling message to disk'; return 'reject', $pid, undef, undef, $message if $message =~ /^[4|5]\d\d/; return 'reject', $pid, undef, undef, $message if substr($message, 0, 14) eq 'deny mail from'; return 'reject', $pid, undef, undef, $message if substr($message, 0, 18) eq 'denysoft mail from'; return 'info', $pid, undef, undef, $message if substr($message, 0, 15) eq 'Lost connection'; return 'info', $pid, undef, undef, $message if $message eq 'auth success cleared naughty'; return 'info', $pid, undef, undef, $message if substr($message, 0, 15) eq 'Running as user'; return 'info', $pid, undef, undef, $message if substr($message, 0, 16) eq 'Loaded Qpsmtpd::'; return 'info', $pid, undef, undef, $message if substr($message, 0, 24) eq 'Permissions on spool_dir'; return 'info', $pid, undef, undef, $message if substr($message, 0, 13) eq 'Listening on '; return 'info', $pid, undef, undef, $message if substr($message, 0, 18) eq 'size_threshold set'; return 'info', $pid, undef, undef, $message if substr($message, 0, 12) eq 'tls: ciphers'; return 'error', $pid, undef, undef, $message if substr($message, 0, 22) eq 'of uninitialized value'; return 'error', $pid, undef, undef, $message if substr($message, 0, 8) eq 'symbol "'; return 'error', $pid, undef, undef, $message if substr($message, 0, 9) eq 'error at '; return 'error', $pid, undef, undef, $message if substr($message, 0, 15) eq 'Could not print'; print "UNKNOWN LINE: $line\n"; return 'unknown', $pid, undef, undef, $message; } sub parse_line_plugin { my ($line) = @_; # @tai 13486 (connect) ident::p0f: Windows (XP/2000 (RFC1323+, w, tstamp-)) # @tai 13681 (connect) dnsbl: fail, NAUGHTY # @tai 15787 (connect) karma: pass, no penalty (0 naughty, 3 nice, 3 connects) # @tai 27500 (queue) queue::qmail_2dqueue: (for 27481) Queuing to /var/qmail/bin/qmail-queue my ($tai, $pid, $hook, $plugin, $message) = split /\s/, $line, 5; $plugin =~ s/:$//; return parse_line_plugin_p0f($line) if $plugin =~ /^ident::p0f/; return parse_line_plugin_dspam($line) if $plugin =~ /^dspam/; return parse_line_plugin_spamassassin($line) if $plugin =~ /^spamassassin/; if ($plugin eq 'sender_permitted_from') { $message = 'pass' if $message =~ /^pass/; $message = 'fail' if $message =~ /^fail/; $message = 'skip' if $message =~ /^none/; } elsif ($plugin eq 'queue::qmail_2dqueue') { ($pid) = $message =~ /\(for ([\d]+)\)/; $message = 'pass' if $message =~ /Queuing/; } elsif ($plugin =~ /(?:early|karma|helo|rcpt_ok)/) { $message = 'pass' if $message =~ /^pass/; } elsif ($plugin =~ /resolvable_fromhost/) { $message = 'pass' if $message =~ /^pass/; } return 'plugin', $pid, $hook, $plugin, $message; } sub parse_line_plugin_dspam { my $line = shift; my ($tai, $pid, $hook, $plugin, $message) = split /\s/, $line, 5; $plugin =~ s/:$//; if ($message =~ /Innocent, (\d\.\d\d c)/) { $message = "pass, $1"; } if ($message =~ /Spam, (\d\.\d\d c)/) { $message = "fail, $1"; } return 'plugin', $pid, $hook, $plugin, $message; } sub parse_line_plugin_spamassassin { my $line = shift; my ($tai, $pid, $hook, $plugin, $message) = split /\s/, $line, 5; $plugin =~ s/:$//; if ($message =~ /pass, Ham, ([\d\-\.]+)\s/) { $message = "pass, $1"; } if ($message =~ /^fail, Spam,\s([\d\.]+)\s< 100/) { $message = "fail, $1"; } return 'plugin', $pid, $hook, $plugin, $message; } sub parse_line_plugin_p0f { my $line = shift; my ($tai, $pid, $hook, $plugin, $message) = split /\s/, $line, 5; $plugin =~ s/:$//; if (substr($message, -5, 5) eq 'hops)') { ($message) = split(/\s\(/, $message); } $message = 'iOS' if $message =~ /^iOS/; $message = 'Solaris' if $message =~ /^Solaris/; $message = 'Mac OS X' if $message =~ /^Mac OS X/; $message = 'FreeBSD' if $message =~ /^FreeBSD/; $message = 'Linux' if $message =~ /^Linux/; $message = 'OpenBSD' if $message =~ /^OpenBSD/; $message = 'Windows NT' if $message =~ /^Windows \(?NT/; $message = 'Windows 95' if $message =~ /^Windows \(?95/; $message = 'Windows 98' if $message =~ /^Windows \(?98/; $message = 'Windows XP' if $message =~ /^Windows \(?XP/; $message = 'Windows 2000' if $message =~ /^Windows \(?2000/; $message = 'Windows 2003' if $message =~ /^Windows \(?2003/; $message = 'Windows 7 or 8' if $message =~ /^Windows 7/; $message = 'Windows 7 or 8' if $message =~ /^Windows 8/; $message = 'Google' if $message =~ /^Google/; $message = 'HP-UX' if $message =~ /^HP\-UX/; $message = 'NetCache' if $message =~ /^NetCache/i; $message = 'Cisco' if $message =~ /^Cisco/i; $message = 'Netware' if $message =~ /Netware/i; return 'plugin', $pid, $hook, $plugin, $message; } sub parse_line_cleanup { my ($line) = @_; # @tai 85931 cleaning up after 3210 my $pid = (split /\s+/, $line)[-1]; $has_cleanup++; return 'cleanup', $pid, undef, undef, $line; } sub get_score { my $mess = shift; return 3 if $mess eq 'TLS setup returning'; return 3 if $mess =~ /^pass/; return -3 if $mess =~ /^fail/; return -2 if $mess =~ /^negative/; return 2 if $mess =~ /^positive/; return 1 if $mess =~ /^skip/; return 0; } sub get_db { my %dbv = get_config('log2sql'); $dbv{dsn} ||= 'DBI:mysql:database=qpsmtpd;host=db;port=3306'; $dbv{user} ||= 'qplog'; $dbv{pass} ||= 't0ps3cret'; print Dumper(\%dbv); my $db = DBIx::Simple->connect($dbv{dsn}, $dbv{user}, $dbv{pass}) or die DBIx::Simple->error; return $db; } sub get_config { my $file = shift or die "missing file name\n"; my %values; foreach my $line ( get_config_contents( $file ) ) { next if $line =~ /^#/; chomp $line; my ($key,$val) = split /\s*=\s*/, $line, 2; $values{$key} = $val; }; return %values; }; sub get_config_contents { my $name = shift; my @config_dirs = qw[ config ../config log plugins ]; foreach my $dir ( @config_dirs ) { next if ! -f "$dir/$name"; my $fh = IO::File->new(); if ( ! $fh->open( "$dir/$name", '<' ) ) { warn "unable to open config file $dir/$name\n"; next; }; my @contents = <$fh>; return @contents; }; }; sub check_plugins_table { my $rows = exec_query( 'SELECT COUNT(*) FROM plugin'); return if scalar @$rows != 0; my @lines = get_config_contents('registry.txt'); foreach my $line ( @lines ) { next if $line =~ /^\s*#/; # ignore comments chomp $line; next if ! $line; my ($id, $name, $abb3, $abb5, $aliases) = split /\s+/, $line, 5; my $q = "REPLACE INTO plugin (id,name,abb3,abb5) VALUES (??)"; print "query: $q, $id, $name, $abb3, $abb5\n"; exec_query($q, [$id, $name, $abb3, $abb5 ]); next if ! $aliases; foreach my $alias ( split /\s*,\s*/, $aliases ) { next if ! $alias; my $aq = "REPLACE INTO plugin_aliases (plugin_id,name) VALUES (??)"; print "aqury: $aq, $id, $alias\n"; exec_query($aq, [$id, $alias]); }; }; }; sub exec_query { my $query = shift; my $params = shift; die "invalid arguments to exec_query!" if @_; my @params; if (defined $params) { @params = ref $params eq 'ARRAY' ? @$params : $params; } my $err = "query failed: $query\n"; if (scalar @params) { $err .= join(',', @params); } #warn "err: $err\n"; if ($query =~ /(?:REPLACE|INSERT) INTO/) { my ($table) = $query =~ /(?:REPLACE|INSERT) INTO (\w+)\s/; $db->query($query, @params); warn "$db->error\n$err" if $db->error ne 'DBI error: '; return if $query =~ /^REPLACE/; my $id = $db->last_insert_id(undef, undef, $table, undef) or die $err; return $id; } elsif ($query =~ /^UPDATE/i) { return $db->query($query, @params); } elsif ($query =~ /DELETE/) { $db->query($query, @params) or die $err; return $db->query("SELECT ROW_COUNT()")->list; } my $r = $db->query($query, @params)->hashes or die $err; return $r; }