#! /usr/local/bin/perl
#######################################################################
# Copyright (C) 2006-2024 by Carnegie Mellon University.
#
# @OPENSOURCE_LICENSE_START@
#
# SiLK 3.22.0
#
# Copyright 2023 Carnegie Mellon University.
#
# NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE ENGINEERING
# INSTITUTE MATERIAL IS FURNISHED ON AN "AS-IS" BASIS. CARNEGIE MELLON
# UNIVERSITY MAKES NO WARRANTIES OF ANY KIND, EITHER EXPRESSED OR IMPLIED,
# AS TO ANY MATTER INCLUDING, BUT NOT LIMITED TO, WARRANTY OF FITNESS FOR
# PURPOSE OR MERCHANTABILITY, EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF
# THE MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF
# ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR COPYRIGHT
# INFRINGEMENT.
#
# Released under a GNU GPL 2.0-style license, please see LICENSE.txt or
# contact permission@sei.cmu.edu for full terms.
#
# [DISTRIBUTION STATEMENT A] This material has been approved for public
# release and unlimited distribution.  Please see Copyright notice for
# non-US Government use and distribution.
#
# GOVERNMENT PURPOSE RIGHTS - Software and Software Documentation
#
# Contract No.: FA8702-15-D-0002
# Contractor Name: Carnegie Mellon University
# Contractor Address: 4500 Fifth Avenue, Pittsburgh, PA 15213
#
# The Government's rights to use, modify, reproduce, release, perform,
# display, or disclose this software are restricted by paragraph (b)(2) of
# the Rights in Noncommercial Computer Software and Noncommercial Computer
# Software Documentation clause contained in the above identified
# contract. No restrictions apply after the expiration date shown
# above. Any reproduction of the software or portions thereof marked with
# this legend must also reproduce the markings.
#
# Carnegie Mellon(R) and CERT(R) are registered in the U.S. Patent and
# Trademark Office by Carnegie Mellon University.
#
# This Software includes and/or makes use of Third-Party Software each
# subject to its own license.
#
# DM23-0973
#
# @OPENSOURCE_LICENSE_END@
#
#######################################################################
# $SiLK: rwscanquery.in 6c9dfbe0b623 2024-01-17 18:02:34Z mthomas $
#######################################################################
# rwscanquery
#
# Query and generate reports from a network scan database.
#######################################################################

use strict;
use warnings;

use Getopt::Long qw(:config gnu_compat permute no_getopt_compat no_bundling);
use Data::Dumper;
use FindBin;
use Pod::Usage;
use File::Temp;

### Config

my $conf_db_driver;
my $conf_db_user;
my $conf_db_pass;
my $conf_db_instance;
my $conf_rw_in_class;
my $conf_rw_in_type;
my $conf_rw_out_class;
my $conf_rw_out_type;

### Prototypes for subs defined later

sub db_connect_oracle();
sub db_connect_postgresql();
sub db_connect_mysql();
sub db_connect_sqlite();
sub parse_options();
sub load_rcfile();
sub val_date(\$);
sub val_ip(\$$);
sub val_set(\$$);
sub do_query($);
sub write_standard_results($);
sub write_export_results($);
sub write_volume_results($);
sub write_scan_set($);
sub write_scan_flows($);
sub write_resp_flows($);
sub tool_version_exit();

### Argument processing

my $opt_start_hour;
my $opt_end_hour;
my $opt_saddress;
my $opt_sipset;
my $opt_daddress;
my $opt_dipset;
my $opt_report = "standard";
my $opt_volume_summary;
my $opt_show_header = 0;
my $opt_columnar    = 0;

my $opt_database;

my $opt_verbose;

my $outfile    = "";    # for Perl filehandles
my $outfile_rw = "";    # for rw tools

### SiLK commands used

my $rwfilter   = $ENV{RWFILTER}   || 'rwfilter';
my $rwset      = $ENV{RWSET}      || 'rwset';
my $rwsetbuild = $ENV{RWSETBUILD} || 'rwsetbuild';
my $rwsetcat   = $ENV{RWSETCAT}   || 'rwsetcat';

my $appname = $0;
$appname =~ s/.*\///;

# The queries below are optimized based on the fact that scans in the database
# have a maximum duration of about an hour.  There are some cases where a scan
# for a particular hour can begin in the previous hour, though, so the date
# arithmetic looks back (and ahead) of the time period in question a little
# bit.  Also, note that as of this writing, only the Oracle queries have been
# heavily performance tested, though all have been tested for proper selection.

my %queries = (
    "oracle" => {
        "standard" => q{
            SELECT s.id, s.sip, s.stime, s.etime, s.proto,
              s.flows, s.packets, s.bytes
            FROM scans s
            WHERE s.stime < to_date('$end_hour') + 1/24
            AND s.etime >= to_date('$start_hour')
            AND s.stime >= to_date('$start_hour') - 1/24
            AND s.etime < to_date('$end_hour') + 2/24
            $saddress_part
            $sipset_part
        },
        "volume" => q{
            SELECT TO_CHAR(s.stime, 'YYYY/MM/DD') AS scan_date,
            SUM(s.flows) AS flows,
            SUM(s.packets) AS pkts,
            SUM(s.bytes) AS bytes
            FROM scans s
            WHERE s.stime < to_date('$end_hour') + 1
            AND s.etime >= to_date('$start_hour')
            AND s.stime >= to_date('$start_hour') - 1
            AND s.etime < to_date('$end_hour') + 1
            $saddress_part
            $sipset_part
            GROUP BY TO_CHAR(s.stime, 'YYYY/MM/DD')
            ORDER BY TO_CHAR(s.stime, 'YYYY/MM/DD')
        },
        "scanip" => q{
            SELECT DISTINCT s.sip
            FROM scans s
            WHERE s.stime < to_date('$end_hour') + 1/24
            AND s.etime >= to_date('$start_hour')
            AND s.stime >= to_date('$start_hour') - 1/24
            AND s.etime < to_date('$end_hour') + 2/24
            $saddress_part
            $sipset_part
        },
        "export" => q{
            SELECT s.id, s.sip, s.proto, s.stime, s.etime,
              s.flows, s.packets, s.bytes, s.scan_model, s.scan_prob
            FROM scans s
            WHERE s.stime < to_date('$end_hour') + 1/24
            AND s.etime >= to_date('$start_hour')
            AND s.stime >= to_date('$start_hour') - 1/24
            AND s.etime < to_date('$end_hour') + 2/24
            $saddress_part
            $sipset_part
        }
    },
    "postgresql" => {
        "standard" => q{
            SELECT s.id, s.sip, s.stime, s.etime, s.proto,
              s.flows, s.packets, s.bytes
            FROM scans s
            WHERE s.stime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '1 HOUR'
            AND s.etime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
            AND s.stime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
                - INTERVAL '1 HOUR'
            AND s.etime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '2 HOUR'
            $saddress_part
            $sipset_part
        },
        "volume" => q{
            SELECT to_char(s.stime, 'YYYY/MM/DD') AS scan_date,
            SUM(s.flows) AS flows,
            SUM(s.packets) AS pkts,
            SUM(s.bytes) AS bytes
            FROM scans s
            WHERE s.stime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '1 DAY'
            AND s.etime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
            AND s.stime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
                - INTERVAL '1 DAY'
            AND s.etime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '1 DAY'
            $saddress_part
            $sipset_part
            GROUP BY to_char(s.stime, 'YYYY/MM/DD')
            ORDER BY to_char(s.stime, 'YYYY/MM/DD')
        },
        "scanip" => q{
            SELECT DISTINCT s.sip
            FROM scans s
            WHERE s.stime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '1 HOUR'
            AND s.etime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
            AND s.stime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
                - INTERVAL '1 HOUR'
            AND s.etime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '2 HOUR'
            $saddress_part
            $sipset_part
        },
        "export" => q{
            SELECT s.id, s.sip, s.proto, s.stime, s.etime,
              s.flows, s.packets, s.bytes, s.scan_model, s.scan_prob
            FROM scans s
            WHERE s.stime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '1 HOUR'
            AND s.etime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
            AND s.stime >= to_timestamp('$start_hour', 'YYYY/MM/DD:HH24')
                - INTERVAL '1 HOUR'
            AND s.etime < to_timestamp('$end_hour', 'YYYY/MM/DD:HH24')
                + INTERVAL '2 HOUR'
            $saddress_part
            $sipset_part
        }
    },
    "mysql" => {
        "standard" => q{
            SELECT s.id, s.sip, s.stime, s.etime, s.proto,
              s.flows, s.packets, s.bytes
            FROM scans s
            WHERE s.stime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 1 HOUR
            AND s.etime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
            AND s.stime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
                - INTERVAL 1 HOUR
            AND s.etime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 2 HOUR
            $saddress_part
            $sipset_part
        },
        "volume" => q{
            SELECT DATE_FORMAT(s.stime, '%Y/%m/%d:%H') AS scan_date,
            SUM(s.flows) AS flows,
            SUM(s.packets) AS pkts,
            SUM(s.bytes) AS bytes
            FROM scans s
            WHERE s.stime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 1 DAY
            AND s.etime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
            AND s.stime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
                - INTERVAL 1 DAY
            AND s.etime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 1 DAY
            $saddress_part
            $sipset_part
            GROUP BY DATE_FORMAT(s.stime, '%Y/%m/%d')
            ORDER BY DATE_FORMAT(s.stime, '%Y/%m/%d')
        },
        "scanip" => q{
            SELECT DISTINCT s.sip
            FROM scans s
            WHERE s.stime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 1 HOUR
            AND s.etime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
            AND s.stime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
                - INTERVAL 1 HOUR
            AND s.etime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 2 HOUR
            $saddress_part
            $sipset_part
        },
        "export" => q{
            SELECT s.id, s.sip, s.proto, s.stime, s.etime,
              s.flows, s.packets, s.bytes, s.scan_model, s.scan_prob
            FROM scans s
            WHERE s.stime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 1 HOUR
            AND s.etime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
            AND s.stime >= STR_TO_DATE('$start_hour', '%Y/%m/%d:%H')
                - INTERVAL 1 HOUR
            AND s.etime < STR_TO_DATE('$end_hour', '%Y/%m/%d:%H')
                + INTERVAL 2 HOUR
            $saddress_part
            $sipset_part
        }
    },
    "sqlite" => {
        "standard" => q{
            SELECT s.id, s.sip, s.stime, s.etime, s.proto,
              s.flows, s.packets, s.bytes
            FROM scans s
            WHERE s.stime < datetime('$end_hour_iso', '+1 hour')
            AND s.etime >= datetime('$start_hour_iso')
            AND s.stime >= datetime('$start_hour_iso', '-1 hour')
            AND s.etime < datetime('$end_hour_iso', '+2 hours')
            $saddress_part
            $sipset_part
        },
        "volume" => q{
            SELECT strftime('%Y/%m/%d', s.stime) AS scan_date,
            SUM(s.flows) AS flows,
            SUM(s.packets) AS pkts,
            SUM(s.bytes) AS bytes
            FROM scans s
            WHERE s.stime < datetime('$end_hour_iso', '+1 day')
            AND s.etime >= datetime('$start_hour_iso')
            AND s.stime >= datetime('$start_hour_iso', '-1 day')
            AND s.etime < datetime('$end_hour_iso', '+1 day')
            $saddress_part
            $sipset_part
            GROUP BY strftime('%Y/%m/%d', s.stime)
            ORDER BY strftime('%Y/%m/%d', s.stime)
        },
        "scanip" => q{
            SELECT DISTINCT s.sip
            FROM scans s
            WHERE s.stime < datetime('$end_hour_iso', '+1 hour')
            AND s.etime >= datetime('$start_hour_iso')
            AND s.stime >= datetime('$start_hour_iso', '-1 hour')
            AND s.etime < datetime('$end_hour_iso', '+2 hours')
            $saddress_part
            $sipset_part
        },
        "export" => q{
            SELECT s.id, s.sip, s.proto, s.stime, s.etime,
              s.flows, s.packets, s.bytes, s.scan_model, s.scan_prob
            FROM scans s
            WHERE s.stime < datetime('$end_hour_iso', '+1 hour')
            AND s.etime >= datetime('$start_hour_iso')
            AND s.stime >= datetime('$start_hour_iso', '-1 hour')
            AND s.etime < datetime('$end_hour_iso', '+2 hours')
            $saddress_part
            $sipset_part
        }
    }
);

parse_options();
load_rcfile();

if ( !defined $opt_database ) {
    if ( defined $conf_db_instance ) {
        $opt_database = $conf_db_instance;
    }
    else {
        $opt_database = "SCAN";
    }
}

require DBI;

my $dbh;

if ( !defined $conf_db_driver ) {
    die "$appname: No database driver specified\n";
}
elsif ( $conf_db_driver =~ /oracle/i ) {
    $dbh = db_connect_oracle();
}
elsif ( $conf_db_driver =~ /postgresql/i ) {
    $dbh = db_connect_postgresql();
}
elsif ( $conf_db_driver =~ /mysql/i ) {
    $dbh = db_connect_mysql();
}
elsif ( $conf_db_driver =~ /sqlite/i ) {
    $dbh = db_connect_sqlite();
}
else {
    die "$appname: Unknown db_driver: $conf_db_driver\n";
}

my $sth;

# using 'foreach' as a switch()
foreach ($opt_report) {
    if (/standard/i) {
        open( OUTF, ">$outfile" )
            or die "$appname: Cannot open output file '$outfile': $!\n";
        $sth = do_query("standard");
        write_standard_results($sth);
        close(OUTF);
    }
    elsif (/volume/i) {
        open( OUTF, ">$outfile" )
            or die "$appname: Cannot open output file '$outfile': $!\n";
        $sth = do_query("volume");
        write_volume_results($sth);
        close(OUTF);
    }
    elsif (/scanset/i) {
        $sth = do_query("scanip");
        write_scan_set($sth);
    }
    elsif (/scanflows/i) {
        $sth = do_query("scanip");
        write_scan_flows($sth);
    }
    elsif (/respflows/i) {
        $sth = do_query("scanip");
        write_resp_flows($sth);
    }
    elsif (/export$/i) {
        open( OUTF, ">$outfile" )
            or die "$appname: Cannot open output file '$outfile': $!\n";
        $sth = do_query("export");
        write_export_results($sth);
        close(OUTF);
    }
    else {
        die "$appname: Invalid report: '$_'\n";
    }
}

$sth->finish;
$dbh->disconnect;

exit 0;

# Helper functions


### Parse the options

sub parse_options()
{
    my $opt_help;
    my $opt_man;
    my $opt_version;
    my $opt_outfile;

    # process options.  see "man Getopt::Long"
    GetOptions(
        'start-date=s',  \$opt_start_hour,
        'end-date=s',    \$opt_end_hour,
        'saddress=s',    \$opt_saddress,
        'sipset=s',      \$opt_sipset,
        'daddress=s',    \$opt_daddress,
        'dipset=s',      \$opt_dipset,
        'report=s',      \$opt_report,
        'show-header!',  \$opt_show_header,
        'columnar!',     \$opt_columnar,
        'output-path=s', \$opt_outfile,
        'database=s',    \$opt_database,

        'verbose|v!',    \$opt_verbose,

        'help',    \$opt_help,
        'man',     \$opt_man,
        'version', \$opt_version,

        ) or pod2usage( -exitval => -1 );

    pod2usage( -exitval => 0 ) if $opt_help;
    pod2usage( -exitval => 0, -verbose => 2 ) if $opt_man;
    tool_version_exit() if $opt_version;

    # set output files
    if (!$opt_outfile) {
        $outfile    = "&STDOUT";
        $outfile_rw = "stdout";
    }
    elsif ($opt_outfile
           =~ qr/(^[\w\+_\040\#\(\)\{\}\[\]\/\-\^,\.:;&%@\\~]+\$?$)/)
    {
        $outfile = "$1";
        $outfile_rw = "$opt_outfile";
    }
    else {
        die "$appname: Invalid characters in output-path filename\n";
    }
    print STDERR "writing results to $outfile\n" if $opt_verbose;


    # set and verify times
    if ( !defined($opt_start_hour) ) {
        # when no start-date, use all of today
        if ( defined $opt_end_hour ) {
            die "$appname: Cannot specify end-date without start-date\n";
        }

        my ( $day, $month, $year ) = (localtime)[ 3, 4, 5 ];
        $opt_start_hour = sprintf("%04d/%02d/%02d",
                                  $year + 1900, $month + 1, $day);
        if ( $opt_report =~ /volume/i ) {
            # volume report is day-based, no need for hour
            $opt_end_hour = $opt_start_hour;
        }
        else {
            $opt_end_hour = $opt_start_hour . ":23";
        }
    }
    elsif ( !defined($opt_end_hour) ) {
        # when only start-date, use either that hour or the entire day
        if ( $opt_report =~ /volume/i ) {
            # volume report is day-based, no need for hour
            $opt_end_hour = $opt_start_hour;
        }
        elsif ( $opt_start_hour =~ /:\d+/ ) {
            $opt_end_hour = $opt_start_hour;
        }
        else {
            $opt_end_hour = $opt_start_hour . ":23";
        }
    }
    elsif ( $opt_start_hour !~ /:\d+/ ) {
        # ignore hour on end-date when no hour on start-date.  Have
        # end-date go to end-of-day unless volume report, which is
        # already day-based
        $opt_end_hour =~ s/^([^:]+):.*/$1/;
        if ( $opt_report !~ /volume/i ) {
            $opt_end_hour .= ":23";
        }
    }
    elsif ( $opt_end_hour !~ /:\d+/ && $opt_report !~ /volume/i ) {
        # set ending hour to the starting hour
        $opt_start_hour =~ m/(:\d.*)/;
        $opt_end_hour .= $1;
    }

    my $start = val_date($opt_start_hour);
    my $end = val_date($opt_end_hour);
    if ($start > $end) {
        die "$appname: Invalid dates: end-date is earlier that start-date\n";
    }

    # verify source IPs
    val_ip($opt_saddress, 's.sip');
    val_set($opt_sipset, 's.sip');
}

sub val_date(\$)
{
    my $in = shift;
    return unless defined $$in;

    my $time = 0;
    if (
        $$in =~ m{ ^
    (\d{1,4})/(\d{1,2})/(\d{1,2}):?(\d{1,2})?
    $ }x
        )
    {
        # Has a full date, may or may not have further date parts
        my ( $year, $mon, $day, $hour ) = ( $1, $2, $3, $4 );
        $hour = 0 unless defined $hour;
        if ( $hour > 23 ) {
            die "$appname: Invalid date '$$in': Hour is too large\n";
        }
        if ( $year < 2000 || $year > 2050 ) {
            die "$appname: Invalid date '$$in': Year is out of range\n";
        }
        if ( $day == 0 ) {
            die "$appname: Invalid date '$$in': Day is too small\n";
        }
        if ( $mon == 4 || $mon == 6 || $mon == 9 || $mon == 11 ) {
            if ( $day > 30 ) {
                die "$appname: Invalid date '$$in': Day is too large\n";
            }
        }
        elsif ( $mon == 2 ) {
            if (( $year % 400 == 0 )
                || ( $year % 4 == 0 && $year % 100 != 0 ))
            {
                if ( $day > 29 ) {
                    die "$appname: Invalid date '$$in': Day is too large\n";
                }
            }
            elsif ( $day > 28 ) {
                die "$appname: Invalid date '$$in': Day is too large\n";
            }
        }
        elsif ( $mon < 1 || $mon > 12 ) {
            die "$appname: Invalid date '$$in': Month is out of range\n";
        }
        elsif ( $day > 31 ) {
            die "$appname: Invalid date '$$in': Day is too large\n";
        }

        $time = (($year - 1900) * 1000000 + $mon * 10000 + $day * 100 + $hour);
        print STDERR "$$in -> " if $opt_verbose;
        $$in = sprintf( "%04d/%02d/%02d:%02d", $year, $mon, $day, $hour );
        print STDERR "$$in\n" if $opt_verbose;
    }
    else {
        die "$appname: Invalid date: $$in\n";
    }

    return $time;
}

### Validate an IP argument, and parse into ranges

sub val_ip(\$$)
{
    my $in  = shift;
    my $var = shift;
    return unless defined $$in;
    my @result = ();

    # create temporary IPset from the input
    my $set_file = File::Temp::tmpnam();

    open( IPSET_OUT,
          "|$rwsetbuild --ip-ranges --record-version=2 - '$set_file'" )
        or die "$appname: rwsetbuild failed: $!";
    for (split /,/, $$in) {
        print IPSET_OUT "$_\n"
            or die "$appname: rwsetbuild failed: $!";
    }
    close(IPSET_OUT)
        or die "$appname: rwsetbuild failed: $!";

    $$in = $set_file;

    val_set($$in, $var);

    unlink $set_file;
}

### Validate an ipset argument, and parse the set into ranges

sub val_set(\$$)
{
    my $in  = shift;
    my $var = shift;

    # verify input
    return unless defined $$in;
    if ( !-e $$in ) {
        die "$appname: Invalid (non-existent) IPset file: $$in\n";
    }
    my $count = `$rwsetcat --count-ips '$$in' 2>&1`;
    unless ($count =~ /^\d+$/) {
        die "$appname: rwsetcat failed: $count\n";
    }
    if ($count == 0) {
        $$in = undef;
        return;
    }

    # process contents of the IPset
    my @result = ();
    open( IPSET_IN,
          "$rwsetcat --ip-format=decimal --ip-ranges --delimited=, '$$in'|")
        or die "$appname: rwsetcat failed: $!\n";
    while (<IPSET_IN>) {
        chomp;
        my (undef, $first, $last) = split /,/;
        if ( $first == $last ) {
            push @result, "($var = $first)";
        }
        else {
            push @result, "($var >= $first AND $var <= $last)";
        }
    }
    close( IPSET_IN );
    $$in = join " or\n             ", @result;
}

sub load_rcfile()
{
    my $HOME   = ( getpwuid($<) )[7];
    my $rcfile = "$HOME/.rwscanrc";

    if (defined($ENV{'RWSCANRC'})) {
        $rcfile = $ENV{'RWSCANRC'};
    }

    # First, look for .rwscanrc in the current user's home directory
    if ( !-f $rcfile ) {

        # If no .rwscanrc exists in the user's ~, we look in the
        # "share/silk" directory parallel to the directory where the
        # script runs, under the assumption that the script is in a
        # "bin" subdirectory.

        my $script_root = $FindBin::Bin;
        $script_root =~ s@/bin$@@;
        $rcfile = "$script_root/share/silk/.rwscanrc";
        if ( !-f $rcfile ) {
            $rcfile = "$script_root/.rwscanrc";
            if ( !-f $rcfile ) {
                warn ("$appname: Could not find .rwscanrc file,",
                      " defaults will be used\n");
                return -1;
            }
        }
    }
    my %rcopts;
    open( RCFILE, $rcfile )
        or die "$appname: Cannot open '$rcfile': $!\n";
    while (<RCFILE>) {
        next if (/^\s*#/);
        if (/^(\S+)\s*=\s*(.*)\s*$/) {
            $rcopts{$1} = $2;
        }
    }
    close(RCFILE);

    if ( defined $rcopts{'db_driver'} ) {
        $conf_db_driver = $rcopts{'db_driver'};
        if ( $conf_db_driver =~ /oracle/i ) {
            $conf_db_driver = "oracle";
            die if !eval { require DBD::Oracle; };
        }
        elsif ( $conf_db_driver =~ /postgresql/i ) {
            $conf_db_driver = "postgresql";
            die if !eval { require DBD::Pg; };
        }
        elsif ( $conf_db_driver =~ /mysql/i ) {
            $conf_db_driver = "mysql";
            die if !eval { require DBD::mysql; };
        }
        elsif ( $conf_db_driver =~ /sqlite/i ) {
            $conf_db_driver = "sqlite";
            die if !eval { require DBD::SQLite; };
        }
        else {
            die "$appname: Unsupported db_driver: $conf_db_driver\n";
        }
    }
    else {
        warn ("$appname: Warning: db_driver not specified in rcfile,",
              " defaulting to Oracle\n");
        $conf_db_driver = "oracle";
    }

    if ( !defined $rcopts{'oracle_userid'} ) {
        $conf_db_user = $rcopts{'db_userid'};
        $conf_db_pass = $rcopts{'db_password'};
    }
    else {
        warn ("$appname: Warning: using legacy option 'oracle_userid'. ",
              "Please use 'db_userid' instead.\n");
        $conf_db_user = $rcopts{'oracle_userid'};
        $conf_db_pass = $rcopts{'oracle_password'};
    }

    if ( defined $rcopts{'db_instance'} ) {
        $conf_db_instance = $rcopts{'db_instance'};
    }

    if ( defined $rcopts{'rw_in_class'} ) {
        $conf_rw_in_class = $rcopts{'rw_in_class'};
    }

    if ( defined $rcopts{'rw_in_type'} ) {
        $conf_rw_in_type = $rcopts{'rw_in_type'};
    }

    if ( defined $rcopts{'rw_out_class'} ) {
        $conf_rw_out_class = $rcopts{'rw_out_class'};
    }

    if ( defined $rcopts{'rw_out_type'} ) {
        $conf_rw_out_type = $rcopts{'rw_out_type'};
    }
}

sub db_connect_oracle()
{
    my $dbh = DBI->connect( "dbi:Oracle:$opt_database",
                            $conf_db_user, $conf_db_pass )
        or die "$appname: Oracle connect failed: $!";
    $dbh->do("alter session set NLS_DATE_FORMAT = 'YYYY/MM/DD:HH24:MI:SS'");
    return $dbh;
}

sub db_connect_postgresql()
{
    my $dbh = DBI->connect( "dbi:Pg:dbname=$opt_database",
                            $conf_db_user, $conf_db_pass )
        or die "$appname: Postgress connect failed: $!";

    #  $dbh->do("alter session set NLS_DATE_FORMAT = 'YYYY/MM/DD:HH24:MI:SS'");
    return $dbh;
}

sub db_connect_mysql()
{
    my $dbh = DBI->connect( "dbi:mysql:$opt_database",
                            $conf_db_user, $conf_db_pass )
        or die "$appname: MySQL connect failed: $!";
    return $dbh;
}

sub db_connect_sqlite()
{
    my $dbh = DBI->connect( "dbi:SQLite:$opt_database",
                            $conf_db_user, $conf_db_pass )
      or die "$appname: SQlite connect failed: $!";
    return $dbh;
}

sub do_query($)
{
    my ($type) = @_;
    my $saddress_part = "";
    if ( defined $opt_saddress ) {
        $saddress_part = qq{
      AND ($opt_saddress)
    };
    }

    my $sipset_part = "";
    if ( defined $opt_sipset ) {
        $sipset_part = qq{
      AND ($opt_sipset)
    };
    }

    my $query = $queries{$conf_db_driver}{$type};

    $query =~ s/\$saddress_part/$saddress_part/g;
    $query =~ s/\$sipset_part/$sipset_part/g;

    $opt_start_hour =~ m!(\d+)/(\d+)/(\d+):(\d+)!;
    my $start_hour_iso = "$1-$2-$3 $4:00:00";

    $opt_end_hour =~ m!(\d+)/(\d+)/(\d+):(\d+)!;
    my $end_hour_iso = "$1-$2-$3 $4:00:00";

    $query =~ s/\$start_hour_iso/$start_hour_iso/g;
    $query =~ s/\$end_hour_iso/$end_hour_iso/g;

    $query =~ s/\$start_hour/$opt_start_hour/g;
    $query =~ s/\$end_hour/$opt_end_hour/g;

    print STDERR "$type query:\n$query\n" if $opt_verbose;

    my $sth = $dbh->prepare( $query, { ora_check_sql => 0 } )
        or die "$appname: Failed to prepare database query\n";

    my $rv = $sth->execute;
    return $sth;
}


# Take the results from a query and create an IPset
sub query_to_ipset($$)
{
    my ($sth, $set_file) = @_;

    my $s_srcaddr;
    $sth->bind_columns( \$s_srcaddr );

    open( IPSET_OUT, "|$rwsetbuild stdin '$set_file'" )
        or die "$appname: rwsetbuild failed: $!";
    while ( $sth->fetch ) {
        my $o_srcaddr = join('.', unpack('C4', pack('N', $s_srcaddr)));
        print IPSET_OUT $o_srcaddr, "\n"
            or die "$appname: rwsetbuild failed: $!";
    }
    close(IPSET_OUT)
        or die "$appname: rwsetbuild failed: $!";
}


# Generate output for --report=standard: Write one textual line for
# each scanning IP
sub write_standard_results($)
{
    my ($sth) = @_;
    my (
        $s_id,  $s_stime, $s_proto, $s_etime,
        $s_sip, $s_flows, $s_pkts,  $s_bytes
    );

    $sth->bind_columns(
        \(
            $s_id,    $s_sip,   $s_stime, $s_etime,
            $s_proto, $s_flows, $s_pkts,  $s_bytes
        )
    );

    if ($opt_columnar) {
        if ($opt_show_header) {
            printf OUTF "%10s %-19s %-19s %-10s %-15s %10s %10s %14s\n",
              "scan-id", "start-time", "end-time", "protocol", "source-address",
              "flows", "packets", "bytes",;
        }
        open( SAVE_STDERR, ">&STDERR" );
        open( STDERR,      ">/dev/null" );
        eval {
            while ( $sth->fetch ) {
                my $o_sip = join( '.', unpack( 'C4', pack( 'N', $s_sip ) ) );
                printf OUTF "%10d %-19s %-19s %-10d %-15s %10d %10d %14s\n",
                  $s_id, $s_stime, $s_etime, $s_proto, $o_sip, $s_flows,
                  $s_pkts, $s_bytes;
            }
        };
        open( STDERR, ">&SAVE_STDERR" );
    }
    else {
        if ($opt_show_header) {
            print OUTF
              "scan-id|stime|etime|proto|srcaddr|flows|packets|bytes\n";
        }

        # This song and dance is to make it so that ^C doesn't cause
        # an ugly error message.
        use vars qw( *SAVE_STDERR );    # to avoid warning
        open( SAVE_STDERR, ">&STDERR" );
        open( STDERR,      ">/dev/null" );

        eval {
            while ( $sth->fetch ) {
                my $o_sip = join( '.', unpack( 'C4', pack( 'N', $s_sip ) ) );
                print OUTF "$s_id|$s_stime|$s_etime|$s_proto|$o_sip|"
                  . "$s_flows|$s_pkts|$s_bytes\n";
            }
        };
        open( STDERR, ">&SAVE_STDERR" );
    }

}


# Generate output for --report=export: Write textual columns retrieved
# from the database in a form consisent with rwscan.
sub write_export_results($)
{
    my ($sth) = @_;
    my (
        $s_id,         $s_stime, $s_proto, $s_etime,
        $s_sip,        $s_flows, $s_pkts,  $s_bytes,
        $s_scan_model, $s_scan_prob
    );

    $sth->bind_columns(
        \(
            $s_id,         $s_sip,   $s_proto, $s_stime,
            $s_etime,      $s_flows, $s_pkts,  $s_bytes,
            $s_scan_model, $s_scan_prob
        )
    );

    if ($opt_show_header) {
        print OUTF "id|sip|proto|stime|etime|"
          . "flows|packets|bytes|scan_model|scan_prob\n";
    }

    use vars qw( *SAVE_STDERR );    # to avoid warning
    open( SAVE_STDERR, ">&STDERR" );
    open( STDERR,      ">/dev/null" );

    eval {
        while ( $sth->fetch ) {
            print OUTF "$s_id|$s_sip|$s_proto|$s_stime|$s_etime|"
              . "$s_flows|$s_pkts|$s_bytes|$s_scan_model|$s_scan_prob\n";
        }
    };
    open( STDERR, ">&SAVE_STDERR" );
}


# Generate output for --report=volume: Columnar text giving the daily
# scan volume
sub write_volume_results($)
{
    my ($sth) = @_;

    my @titles = qw(date flows packets bytes);
    my ( $s_stime, $s_flows, $s_pkts, $s_bytes );
    $sth->bind_columns( \( $s_stime, $s_flows, $s_pkts, $s_bytes ) );

    my $format;

    if ($opt_columnar) {
        if ($opt_show_header) {
            printf OUTF "%-19s %10s %10s %14s\n", @titles;
        }
        $format = "%-19s %10u %10u %14u\n";
    }
    else {
        if ($opt_show_header) {
            print OUTF join("|", @titles), "\n";
        }
        $format = "%s|%u|%u|%u\n";
    }

    # This song and dance is to make it so that ^C doesn't cause
    # an ugly error message.
    use vars qw( *SAVE_STDERR );    # to avoid warning
    open( SAVE_STDERR, ">&STDERR" );
    open( STDERR,      ">/dev/null" );
    eval {
        while ( $sth->fetch ) {
            printf OUTF $format, $s_stime, $s_flows, $s_pkts, $s_bytes;
        }
    };
    open( STDERR, ">&SAVE_STDERR" );
}


# Generate output for --report=scanset: create an IPset containing
# scanning IPs.
sub write_scan_set($)
{
    my ($sth) = @_;

    if ( -t $outfile ) {
        die("$appname: Will not write IPset to a terminal. ",
            "Please specify an output file on the command line.\n");
    }

    if ( $opt_daddress || $opt_dipset ) {
        my $set_filename = File::Temp::tmpnam();
        query_to_ipset($sth, $set_filename);

        my @args;
        push( @args,
              '--start-date', $opt_start_hour, '--end-date', $opt_end_hour,
              '--sipset',     $set_filename,   '--pass',     'stdout' );

        if ( defined($conf_rw_in_class) ) {
            push( @args, '--class', $conf_rw_in_class );
        }
        if ( defined($conf_rw_in_type) ) {
            push( @args, '--type', $conf_rw_in_type );
        }
        if ( defined($opt_daddress) ) {
            push( @args, '--daddress', $opt_daddress );
        }
        if ( defined($opt_dipset) ) {
            push( @args, '--dipset', $opt_dipset );
        }

        system( "$rwfilter @args | $rwset --sip-file '$outfile_rw'" );
        unlink $set_filename;
    }
    else {
        query_to_ipset($sth, $outfile_rw);
    }
}


# Generate output for --report=scanflows: flows coming from the
# scanning IPs during the time window
sub write_scan_flows($)
{
    my ($sth) = @_;

    if ( -t $outfile ) {
        die("$appname: Will not write flow records to a terminal. ",
            "Please specify an output file on the command line.\n");
    }

    my $set_filename = File::Temp::tmpnam();
    query_to_ipset($sth, $set_filename);

    my @args;
    push( @args,
          '--start-date', $opt_start_hour, '--end-date', $opt_end_hour,
          '--sipset',     $set_filename,   '--pass',     $outfile_rw );

    if ( defined($conf_rw_in_class) ) {
        push( @args, '--class', $conf_rw_in_class );
    }
    if ( defined($conf_rw_in_type) ) {
        push( @args, '--type', $conf_rw_in_type );
    }
    if ( defined($opt_daddress) ) {
        push( @args, '--daddress', $opt_daddress );
    }
    if ( defined($opt_dipset) ) {
        push( @args, '--dipset', $opt_dipset );
    }

    print STDERR Dumper(@args) if $opt_verbose;

    system( $rwfilter, @args );

    unlink $set_filename;
}


# Generate output for --report=respflows: flows that are responses to
# the scanning IPs during the time window
sub write_resp_flows($)
{
    my ($sth) = @_;

    if ( -t $outfile ) {
        die("$appname: Will not write flow records to a terminal. ",
            "Please specify an output file on the command line.\n");
    }

    my $set_filename = File::Temp::tmpnam();
    query_to_ipset($sth, $set_filename);

    my @args;
    push( @args,
          '--start-date', $opt_start_hour, '--end-date', $opt_end_hour,
          '--dipset',     $set_filename,   '--pass',     $outfile_rw );

    # Restrict filter to outgoing flows only.
    if ( defined($conf_rw_out_class) ) {
        push( @args, '--class', $conf_rw_out_class );
    }
    if ( defined($conf_rw_out_type) ) {
        push( @args, '--type', $conf_rw_out_type );
    }

    # These two look funny at first glance.  However, opt_daddress or
    # opt_dipset specify the destinations of scans, and we want the
    # response flows sent back to the scanners.
    if ( defined($opt_daddress) ) {
        push( @args, '--saddress', $opt_daddress );
    }
    if ( defined($opt_dipset) ) {
        push( @args, '--sipset', $opt_dipset );
    }

    system( $rwfilter, @args );

    unlink $set_filename;
}


# Generate output for --version: Print version and exit.
sub tool_version_exit()
{
    my $pkg = 'SiLK 3.22.2' || 'SiLK';
    my $bugs = 'netsa-help@cert.org' || 'UNKNOWN';

    print <<EOF;
$appname: Part of $pkg
Copyright (C) 2001-2024 by Carnegie Mellon University
GNU General Public License (GPL) Rights pursuant to Version 2, June 1991.
Government Purpose License Rights (GPLR) pursuant to DFARS 252.227.7013.
Send bug reports, feature requests, and comments to $bugs.
EOF
    exit;
}

__END__

=head1 NAME

B<rwscanquery> - Query the network scan database

=head1 SYNOPSIS

 rwscanquery [options]

Report Options:

  --report=REPORT_TYPE       Select query and output options.  Values
                             for REPORT_TYPE are standard, volume,
                             scanset, scanflows, respflows, and export

  --start-date=YYYY/MM/DD:HH Report on scans active after this date.
  --end-date=YYYY/MM/DD:HH   Defaults to start-date.

  --saddress=ADDR_SPEC       Show scans originating from matching hosts.
  --sipset=IPSET_FILE        Show scans originating from hosts in set.

  --daddress=IP_WILDCARD     Show only scans targeting matching hosts.
  --dipset=IPSET_FILE        Show only scans targeting hosts in set.

  --show-header              Display column titles at start of output.
  --columnar                 Display more human-readable columnar view.
  --output-path=PATH         Write results to the specified file.

Configuration Options:

  --database=DBNAME          Query an alternate scan database

Help Options:

  --help                     Display this brief help message.
  --man                      Display the full documentation.
  --version                  Display the version information.

=head1 DESCRIPTION

B<rwscanquery> queries the network scan database---that is, the
database that contains scans found by B<rwscan(1)>.  The type of
output B<rwscanquery> creates is controlled by the B<--report> switch
as described in the L</Report Options> section below.  B<rwscanquery>
writes its output to the location specified by the B<--output-path>
switch or to the standard output when that switch is not provided.

B<rwscanquery> runs a query of the scan database and then, depending
on the report type, either displays the result set as text or creates
a binary SiLK from the result set.  The database rows that are part of
the result set may be limited by using the B<--start-date>,
B<--end-date>, B<--saddress>, and B<--sipset> switches.  The result
set is always limited to a time window, and the current day is used
when no B<--start-date> is given.

The following three report types produce textual output.  The default
output displays the values separated by a vertical bar (C<|>) with no
spacing.  The B<--columnar> switch causes the output to appear in
columns with a space-delimiter between the columns.  The output
includes no title line unless the B<--show-header> switch is
specified.

=over 4

=item *

The C<standard> report contains most of the columns in the database
for the rows in the result set.  (The columns containing the scan
model and scan probability are not included.)

=item *

The C<volume> report groups the rows in the result set by day and
shows sums the flows, packets, and bytes columns for each day.

=item *

The C<export> report contains all the columns in the database for the
rows in the result set, and the rows are displayed in a format
compatible with B<rwscan>.

=back

The following three report types create a binary SiLK file as their
result.  These report types invoke other SiLK tools (namely
B<rwfilter(1)>, B<rwset(1)>, B<rwsetbuild(1)>, and B<rwsetcat(1)>) and
the report types assume B<rwfilter> has access to a SiLK data
repository.

The first step in all three of these report types is for
B<rwscanquery> to get the distinct IP addresses for the rows in the
result set and pass them into B<rwsetbuild> to create a temporary
IPset file containing the scanning IPs.

=over 4

=item *

A C<scanflows> report produces a file of SiLK Flow records whose
source IP is a scanning IPs.  B<rwscanquery> uses the temporary IPset
as an argument to B<rwfilter> to find flow records in your data
repository that originated from the scanning IPs within the time
window.  You may choose to limit the report to particular IPs targeted
by the scanning IPs by specifying the B<--daddress> or B<--dipset>
switches.  The output from B<rwfilter> is the output of the report.
The B<rwfilter> invocation uses the configuration values
C<rw_in_class> and C<rw_in_type> if they are specified in the
configuration file (c.f. L</CONFIGURATION>).

=item *

A C<respflows> report produces a file of SiLK Flow records whose
destination IP is a scanning IP.  These flow records may represent
responses to a scan.  To create this report, B<rwscanquery> performs
steps similar to those for the C<scanflows> report except the
direction of the B<rwfilter> command is reversed to find flow records
going to the scanning IPs.  You may choose to limit the report to
particular IPs that responded to the scan by specifying the
B<--daddress> or B<--dipset> switches.  The output from B<rwfilter> is
the output of the report.  The B<rwfilter> invocation uses the
configuration values C<rw_out_class> and C<rw_out_type> if they are
specified in the configuration file (c.f. L</CONFIGURATION>).

=item *

A C<scanset> report produces a binary IPset file.

=over 4

=item *

If neither the B<--daddress> nor B<--dipset> switches are specified,
the output of the this report is the temporary IPset file containing
the scanning IPs; that is, all the scanning IPs in the time window.

=item *

Otherwise, B<rwscanquery> performs the same steps it does as when
creating C<scanflows> report.  Next, instead of returning the output
from B<rwfilter>, B<rwscanquery> passes the flow records into B<rwset>
to create an IPset file containing the scanning IPs that targeted
particular IP addresses.

=back

=back

=head1 OPTIONS

Option names may be abbreviated if the abbreviation is unique or is an
exact match for an option.  A parameter to an option may be specified
as B<--arg>=I<param> or B<--arg> I<param>, though the first form is
required for options that take optional parameters.

=head2 Report Options

=over 4

=item B<--report>=I<TYPE>

Specify the query and the type of output to create.  When this switch
is not specified, the default is a C<standard> report.  The supported
values for I<TYPE> are:

=over 4

=item standard

Write one textual line of output for each scan record in the scan
database.  By default, the output has no titles and it is not in
columnar form.  Specify the B<--show-header> and/or B<--columnar>
switches to make the output more human readable.

=item volume

Write a daily scan activity volume summary report for each day within
the time period.  By default, the output has no titles and it is not
in columnar form.  Specify the B<--show-header> and/or B<--columnar>
switches to make the output more human readable.

=item scanset

Write an IPset file containing the IP addresses which were the sources
of scan activity during the selected time period.  The output of this
report type is binary, so you must redirect or pipe the output to a
location or specify the B<--output-path> switch.

=item scanflows

Write a SiLK Flow file containing all flows originating from scanning
IP addresses within the specified time period.  This flow data
includes flows originating from any host that would be listed as a scan
source by your query, from any time within the time period specified
by I<--start-date> and I<--end-date>.  Note that this may include
flows that were not identified by the scan analysis as being part of a
scan.  The output of this report type is binary, so you must redirect
or pipe the output to a location or specify the B<--output-path>
switch.

=item respflows

Write a SiLK Flow file containing all flows sent to scanning IP
addresses within the specified time period---that is, possible
responses to the scanners.  The output of this report type is binary,
so you must redirect or pipe the output to a location or specify the
B<--output-path> switch.

=item export

Write textual output consistent with the output format of the
B<rwscan(1)> tool.  Specify the B<--show-header> switch to include a
title line.

=back

=item B<--start-date>=I<YYYY/MM/DD:HH>

Display scans which were active after this hour.  When this argument
contains a date with no hour and no B<--end-date> switch is
specified, scans for that entire day are returned.  If this switch
is not specified at all, scans for the current day (based on the local
time on the host machine) are returned.

=item B<--end-date>=I<YYYY/MM/DD:HH>

Display scans which were active before the end of this hour.  If no
end-date is given, defaults to the same as start-date.  It is an error
to provide an end-date without a start-date.

=item B<--saddress>=I<ADDR_SPEC>

Display scans originating from hosts described in I<ADDR_SPEC>, where
I<ADDR_SPEC> is a list of addresses, address ranges, and CIDR
blocks.  Only scans originating from hosts in the list are
displayed.

=item B<--sipset>=I<IPSET_FILE>

Display scans originating from hosts in I<IPSET_FILE>, where
I<IPSET_FILE> is a standard SiLK IPset file as created by B<rwset(1)>
or B<rwsetbuild(1)>.  Note that a very complex IPset may take a long
time to process, or even fail to return any results.

=item B<--daddress>=I<IP_WILDCARD>

Display scans targeting hosts described in I<IP_WILDCARD>, where
I<IP_WILDCARD> is a single IP address, a single CIDR block, or an IP
Wildcard expression accepted by B<rwfilter(1)>.  To match on multiple
IPs or networks, use the B<--dipset> switch.  This switch is ignored
for B<--report> types other than C<scanset>, C<scanflows>, and
C<respflows>.

=item B<--dipset>=I<IPSET_FILE>

Display scans targeting hosts in I<IPSET_FILE>, where I<IPSET_FILE> is
a standard SiLK IPset file.  This switch
is ignored for B<--report> types other than C<scanset>, C<scanflows>,
and C<respflows>.

=item B<--show-header>

Display a header line giving a short name (or title) for each field
when printing textual output with the C<standard>, C<volume>, or
C<export> report types.  By default, no header is displayed.

=item B<--columnar>

Display output in more human-readable columnar format when printing
textual output with the C<standard> or C<volume> report types.  When
this switch is not given, the output is presented as data fields
delimited by the B<|> character.

=item B<--output-path>=I<PATH>

Write results to I<PATH> instead of to the standard output.

=back

=head2 Configuration Options

=over 4

=item B<--database>=I<DBNAME>

Select a database instance other than the default.  The default is
specified by the C<db_instance> value in the configuration file as
described in L</CONFIGURATION> below.

=back

=head2 Other Options

=over 4

=item B<--help>

Display a brief usage message and exit.

=item B<--man>

Display full documentation for B<rwscanquery> and exit.

=item B<--version>

Print the version number and exit the application.

=back

=head1 CONFIGURATION

B<rwscanquery> reads configuration information from a file named
F<.rwscanrc>.  If the RWSCANRC environment variable is set, it is used
as the location of the F<.rwscanrc> file.  When RWSCANRC is not set,
B<rwscanquery> attempts to find a file name F<.rwscanrc> in the
directories specified in the L</FILES> section below.

The format of the F<.rwscanrc> file is I<name>=I<value> pairs, one per
line.  The configuration parameters currently read from F<.rwscanrc>
are:

=over 2

=item db_driver

The type of database to connect to.  B<rwscanquery> supports C<oracle>,
C<postgresql>, C<mysql>, and C<sqlite>.

=item db_userid

The userid to use when connecting to the scan database.

=item db_password

The password to use when connecting to the scan database.

=item db_instance

The name of the database instance to connect to if none is provided with the
B<--database> command line switch.  If neither this configuration option nor
the B<--database> command line switch are specified, the hard-coded default
database instance "SCAN" is used.

=item rw_in_class

The class for incoming flow data.  The C<rw_in_class> and
C<rw_in_type> values are used to query scan flows when the
C<scanflows> report type is requested or when the B<--daddress> or
B<--dipset> switches are used for the C<scanset> report type.  If not
specified, B<rwfilter>'s default is used.

=item rw_in_type

The type(s) for incoming flow data.  See C<rw_in_class> for details.

=item rw_out_class

The class for outgoing flow data.  The C<rw_out_class> and
C<rw_out_type> values are used to query scan flows when the
C<respflows> report type is requested.  If not specified,
B<rwfilter>'s default is used.

=item rw_out_type

The type(s) for outgoing flow data.  See C<rw_out_class> for details.
(Note that B<rwfilter> often defaults to querying incoming flows, so
this parameter ought to be specified.)

=back

=head1 EXAMPLES

In the following examples, the dollar sign (C<$>) represents the shell
prompt.  The text after the dollar sign represents the command line.
Lines have been wrapped for improved readability, and the back slash
(C<\>) is used to indicate a wrapped line.

Display information on all scans occurring during the 12:00 hour
(12:00:00 to 12:59:59) of 2009/02/12.

 $ rwscanquery --show-header --start-date=2009/02/12:12
 scan-id|stime|etime|proto|srcaddr|flows|packets|bytes
 499|2009-02-12 12:01:56|2009-02-12 12:08:39|6|10.199.151.231|256|256|10240
 365|2009-02-12 12:08:40|2009-02-12 12:14:54|6|10.146.88.117|256|256|10240
 57|2009-02-12 12:28:51|2009-02-12 12:34:55|6|10.29.23.160|256|256|10240
 627|2009-02-12 11:52:07|2009-02-12 12:41:16|17|10.253.24.230|1023|1023|30175
 366|2009-02-12 12:41:50|2009-02-12 12:48:14|6|10.146.89.46|256|256|10240
 182|2009-02-12 12:54:39|2009-02-12 13:01:20|6|10.79.26.176|256|256|10240
 4|2009-02-12 12:41:19|2009-02-12 13:33:57|17|10.2.47.87|1023|1023|30205

Create the IPset file F<scan.set> containing the scanners discovered
during that hour.

 $ rwscanquery --report=scanset --start-date=2009/02/12:12 \
        --output-path=scan.set
 $ rwsetcat scan.set
 10.2.47.87
 10.29.23.160
 10.79.26.176
 10.146.88.117
 10.146.89.46
 10.199.151.231
 10.253.24.230

Repeat the first query but limit the output to scanners coming from
the CIDR block 10.199.0.0/16.

 $ rwscanquery --show-header --start-date=2009/02/12:12 \
        --saddr=10.199.0.0/16
 scan-id|stime|etime|proto|srcaddr|flows|packets|bytes
 499|2009-02-12 12:01:56|2009-02-12 12:08:39|6|10.199.151.231|256|256|10240

Expand the query for that CIDR block to include the preceding and
following hours (11:00:00 to 13:59:59).

 $ rwscanquery --start-date=2009/02/12:11 --end-date=2009/02/12:13 \
        --saddr=10.199.0.0/16
 499|2009-02-12 12:01:56|2009-02-12 12:08:39|6|10.199.151.231|256|256|10240
 497|2009-02-12 13:33:57|2009-02-12 14:24:35|17|10.199.98.5|1023|1023|30079

Create the IPset file F<scanning-cidr.set> that contains the CIDR
block 10.199.0.0/16, and then search for scans coming from that IP on
Feb 13, 2009.

 $ cat scanning-cidr.txt
 10.199.0.0/16
 $ rwsetbuild scanning-cidr.txt scanning-cidr.set
 $
 $ rwscanquery --start-date=2009/02/13 --sipset=scanning-cidr.set
 500|2009-02-13 22:42:25|2009-02-13 22:48:45|6|10.199.207.32|256|256|10240

Print the volume of data attributed to scans over a three day period.

 $ rwscanquery --report=volume --show-header  \
        --start-date=2009/02/12 --end-date=2009/02/14
 date|flows|packets|bytes
 2009/02/12|137452|137499|17149008
 2009/02/13|74727|76167|2798040
 2009/02/14|76160|76160|2750531

The following limits the volume report to the IPs in the file
F<scanning-cidr.set> and displays the results in columns.

 $ rwscanquery --report=volume --show-header --columnar  \
        --start-date=2009/02/12 --end-date=2009/02/14    \
        --sipset=scanning-cidr.set
 date                     flows    packets          bytes
 2009/02/12                1279       1279          40319
 2009/02/13                 256        256          10240
 2009/02/14                 256        256          10240

Get the SiLK Flow records coming from the scanners during the 12:00
hour on 2009/02/12 and store in the file F<scanning-flows.rw>.

 $ rwscanquery --report=scanflows --start-date=2009/02/12:12  \
        --output=scanning-flows.rw

Use B<rwuniq(1)> to summarize the file F<scanning-flows.rw>.

 $ rwuniq --fields=sip --values=flows,packets,bytes  \
        --sort-output scanning-flows.rw
             sIP|   Records|        Packets|               Bytes|
      10.2.47.87|       373|            373|               11032|
    10.29.23.160|       256|            256|               10240|
    10.79.26.176|       203|            203|                8120|
   10.146.88.117|       256|            256|               10240|
    10.146.89.46|       256|            256|               10240|
  10.199.151.231|       256|            256|               10240|
   10.253.24.230|       846|            846|               24921|

Run a respflows report to verify that there were no responses to the
scan.

 $ rwscanquery --report=respflows --start-date=2009/02/12:12  \
        --output=scanning-response.rw
 $
 $ rwuniq --fields=sip --values=flows,packets,bytes  \
        --sort-output scanning-response.rw
             sIP|   Records|        Packets|               Bytes|

Create the IPset F<subnet-scan.set> for scanners that targeted the
192.168.186.0/24 CIDR block during the 12:00 hour on 2009/02/12.

 $ rwscanquery --report=scanset --start-date=2009/02/12:12        \
        --daddress=192.168.186.0/24 --output-path=subnet-scan.set

Store the corresponding flow records for those scans in the file
F<subset-scan.rw>.

 $ rwscanquery --report=scanflows --start-date=2009/02/12:12        \
        --daddress=192.168.186.0/24 --output-path=subnet-scan.rw

Determine how many IPs in that subnet were targeted.

 $ rwuniq --fields=sip --values=flows,distinct:dip subnet-scan.rw
             sIP|   Records|dIP-Distin|
    10.146.89.46|       256|       256|

Display the title line for an export report.

 $ rwscanquery --report=export --start-date=2009/02/12:12  \
        --show-header | head -1
 id|sip|proto|stime|etime|flows|packets|bytes|scan_model|scan_prob

=head1 ENVIRONMENT

=over 4

=item RWSCANRC

This environment variable allows the user to specify the location of
the F<.rwscanrc> configuration file.  The value may be a complete path
or a file relative to the user's current directory.  See
the L</FILES> section for standard locations of this file.

=item SILK_CLOBBER

The SiLK tools normally refuse to overwrite existing files.  Setting
SILK_CLOBBER to a non-empty value removes this restriction for the
report types of C<scanset>, C<scanflows>, and C<respflows>.

=item SILK_CONFIG_FILE

This environment variable is used as the location for the site
configuration file, F<silk.conf>, for report types that use
B<rwfilter>.  When this environment variable is not set, B<rwfilter>
searches for the site configuration file in the locations specified in
the L</FILES> section.

=item SILK_DATA_ROOTDIR

This environment variable specifies the root directory of data
repository for report types that use B<rwfilter>.  This value
overrides the compiled-in value.  In addition, B<rwfilter> may use
this value when searching for the SiLK site configuration files.  See
the L</FILES> section for details.

=item SILK_RWFILTER_THREADS

The number of threads B<rwfilter> uses when reading files from the
data store.

=item SILK_PATH

This environment variable gives the root of the install tree.  When
searching for the site configuration file, B<rwfilter> may use this
environment variable.  See the L</FILES> section for details.

=item PATH

This is the standard UNIX path (c.f., B<environ(7)>).  Depending on
the report type, B<rwscanquery> may invoke B<rwfilter(1)>,
B<rwset(1)>, B<rwsetbuild(1)>, or B<rwsetcat(1)> as part of its
processing.

=item RWFILTER

Complete path to B<rwfilter>.  If not set, B<rwscanquery> attempts to
find B<rwfilter> on your PATH.

=item RWSET

Complete path to B<rwset>.  If not set, B<rwscanquery> attempts to
find B<rwset> on your PATH.

=item RWSETBUILD

Complete path to B<rwsetbuild>.  If not set, B<rwscanquery> attempts
to find B<rwsetbuild> on your PATH.

=item RWSETCAT

Complete path to B<rwsetcat>.  If not set, B<rwscanquery> attempts to
find B<rwsetcat> on your PATH.

=back

=head1 FILES

=over 4

=item F<${RWSCANRC}>

=item F<${HOME}/.rwscanrc>

=item F<@prefix@/share/silk/.rwscanrc>

Possible locations for the B<rwscanquery> configuration file,
F<.rwscanrc>.  In addition, B<rwscanquery> checks the parent directory
of the directory containing the B<rwscanquery> script.

=item F<${SILK_CONFIG_FILE}>

=item F<${SILK_DATA_ROOTDIR}/silk.conf>

=item F<@SILK_DATA_ROOTDIR@/silk.conf>

=item F<${SILK_PATH}/share/silk/silk.conf>

=item F<${SILK_PATH}/share/silk.conf>

=item F<@prefix@/share/silk/silk.conf>

=item F<@prefix@/share/silk.conf>

Possible locations for the SiLK site configuration file---for report
types that use B<rwfilter>.

=back

=head1 SEE ALSO

B<rwscan(1)>, B<rwfilter(1)>, B<rwset(1)>, B<rwsetbuild(1)>,
B<rwsetcat(1)>, B<rwuniq(1)>, B<silk(7)>, B<environ(7)>

=cut

# Local Variables:
# mode:perl
# indent-tabs-mode:nil
# End:
