NetFlow, Perl и Threads

Блог им. dreamhunter
Ну вот можно и поздравить меня с освоением нового горизонта. Кто-то может и усмехнется, что для него такие вещи семечки, однако этот кто-то вряд ли занимается чем то кроме своей специализации (в отличие от меня).

Задача: Создать многопоточное приложение, которое позволяет осуществлять обработку информации, полученной с большого количества сенсоров.

Главная проблема: Данные NetFlow передаются по UDP. Если пакет данных не был считан вовремя, он теряется. Все просто, когда у тебя 1-2 сенсора. В этом случае проблем никаких нет. Коллектор будет успевать перерабатывать данные. Тем более непосредственную переработку можно переложить на другую программу. Конечно потребуется создать вспомогательные таблицы, из которых можно сводить все в нужный вам вид. Но это не великое зло… Самое великое зло наступает в тот момент, когда вы хотите считать информацию с 10-20 нагруженных сенсоров.

Решение: Решение напрашивается самим собой — многопоточность. С одной стороны это действительно дремучий лес. Необходимо согласовать асинхронную обработку, распределить потоки данных и уследить за тем, что бы не пошла утечка памяти.
Лично я воспользовался нитями Perl (threads). Это механизм «легковесных» потоков. Сразу оговорюсь: Да, на самом деле эти потоки не легковесные. Приложение будет занимать порядочное место в памяти и кушать ресурсы по полной. Однако задачи сэкономить ресурсы не стоит. Наоборот, речь идет о полной утилизации ресурсов. То есть, что бы программа использовала ресурсы на полную катушку, удовлетворяя потребности в переработке данных.

Итак, начнем с того, что получим данные с сенсора. Воспользуемся нашей любимой конструкцией:
#!/usr/bin/perl
use strict;
use IO::Socket::INET;
use DBI;
use POSIX;
use feature qw { switch };
use threads;
use threads::shared;
use Thread;
use Thread::Queue;
use Thread::State;
use Thread::Running;

my ($log_level) = 10;
my ($result, $request, @v5thread,@v5queue,@thread, @template, @data_queue);
my %deviceid;
my @j:shared;
my ($interval) = 300;#time interval
#--------- Connect to DB --------------
my ($dbname) = 'flow';
my ($dbhost) = '127.0.0.1';
my ($dbusr) = 'netflow';
my ($dbpass) = 'netflow';
my ($db) = DBI->connect("DBI:mysql:$dbname;host=$dbhost", "$dbusr", "$dbpass") || die "Could not connect to database: $DBI::errstr";
#init thread-writer to sql
my $ip4queue:shared = Thread::Queue->new;
my $ip4thread=threads->new(\&datatosql,$dbname,$dbhost,$dbusr,$dbpass);
#--------- Init Socket ----------------
my ($socket,$received_data,$peer_address,$peer_port, $version);
$socket = new IO::Socket::INET (
    LocalPort => '9998',
    Proto => 'udp',
) or die "ERROR in Socket Creation : $!\n";
#--------- Reading Socket -------------
while(1) {
    my ($recieved_data);
    while(!$recieved_data) {
        $socket->recv($recieved_data,4096);
        $peer_address = unpack("N", $socket->peeraddr());
    }
    $version = unpack("n", substr($recieved_data,0,2));
    given ( $version ) {
        when ( 5 ) { fv5($recieved_data,$peer_address); }
        when ( 9 ) { fv9($recieved_data,$peer_address); }
    }
}
#Joining threads
foreach my $thr(threads->list) {
    $thr->join;
}
#Closing Socket
$socket->close();
$db->disconnect();

Собственно это тело программы. Все остальное идет в подпрограммах. Вкратце мы инициализировали нужные библиотеки, подключились к базе, открыли порт на прослушивание и в зависимости от версии протокола, направляем полученные данные в нужную нам подпрограмму.
На текущий момент я пока что реализовал обработку данных протокола IPv4, полученных с протокола NetFlow версии 5 (возможно, если бы меня кормили, бесплатно катали на машине и платили за жилье, я бы давно реализовал все версии).
Итак, после того как получили данные, направляем их в процедуру fv5:
sub fv5 {
    my ($data,$src)=@_;
    flog (10,0,"got v5 from ". dec2ip($src) ." - ".length($data)." bytes\n");
    my $devid = getdevid($src);
    if (exists($v5thread[$devid]) and $v5thread[$devid]->is_running) {
        print "Thread #$devid exists; Sending data...\n";
        $v5queue[$devid]->enqueue("$data");
    } else {
        print "Thread #$devid not exists. Starting up and sending data...\n";
        $v5queue[$devid]=Thread::Queue->new;
        $v5queue[$devid]->enqueue("$data");
        $v5thread[$devid]=threads->new(\&v5thread,$src,$devid);
    }
}

В этой нехитрой подпрограмме вы выясняем ID устройства и создаем одноименные thread (поток) и queue (очередь). После создания направляем в очередь данные на обработку.
Тут хотелось бы сделать отступление, вскинуть руки и очередной раз поблагодарить Perl за его свистелки и перделки. Очереди помогают перескочить через проблему синхронизации данных. Данные будут складываться в буфер и браться непосредственно потоком. Когда он их обработает — это уже его дело. Не требуется ждать, пока выполнится работа. Просто шлем данные на обработку.
Теперь закономерный вопрос, что происходит в потоке? Пока не поздно, покажу вспомогательные процедуры (без них никак):
sub fv9 {
    my ($data,$src)=@_;
    flog (10,0,"got v9 from ". dec2ip($_[1]) ." - ".length($_[0])." bytes\n");
    my $devid = getdevid($src);
}

sub ipv4 {
    my ($srcaddr) = @_;
    print $srcaddr;
}

#--------- IP converters --------------
#---- (thanks to Patrick H. Piper) ----
sub dec2ip ($) {
    join '.', unpack 'C4', pack 'N', shift;
}
sub ip2dec ($) {
    unpack N => pack CCCC => split /\./ => shift;
}

sub flog {
my (%log_msg) = ('1' => '',
                '2' => '',
                '3' => '',
                '4' => '',
                '5' => 'Init threads:',
                '6' => 'done'
                );
if (($_[0] >= $log_level) && ($_[1] != 0)) {
    print $log_msg{$_[1]},"\n";
} elsif (($_[0] >= $log_level) && ($_[1] == 0)) {
    print $_[2];
}
};

sub getdevid {
    my ($src) = @_;
    if (exists($deviceid{"$src"})) {
        print "There are device in memory. ";
    } else {
        print "There are no device with IP ". dec2ip($src) ." in memory. Searching in database... ";
        $request = $db->prepare('SELECT `device_id` FROM `devices` WHERE `device_header`='.$src);
        $request->execute;
        if ($result=$request->fetchrow_hashref) {
            print "- device found in DB. ";
        } else {
            print "- device not found. Adding new device... New ";
            $db->do("INSERT INTO devices (`device_header`) VALUES($src)");
            $request = $db->prepare('SELECT `device_id` FROM `devices` WHERE `device_header`='.$src);
            $request->execute;
            $result=$request->fetchrow_hashref;
        }
        $deviceid{"$src"}=$result->{device_id};
    }
    print "Id=".$deviceid{"$src"}."\n";
    return $deviceid{"$src"};
}


Теперь непосредственно к threads:
sub v5thread {
    my ($src,$devid)=@_;
    my (@header, @substr, $data, $i);
    print "Started thread v5 for device ".dec2ip($src)."\n";
    while (1) {
        if ($v5queue[$devid]->pending) {
            print "Thread #$devid got a data...\n";
            $data = $v5queue[$devid]->dequeue;
            @header = unpack("nnN4NNNHHH2", substr($data,0,24));
            my @records;
            for ($i=0; substr($data, 24+48*$i, 48); $i++) {
                @substr = unpack("N3n2N4n2C4n2C2n", substr($data, 24+48*$i, 48));
                push(@records, join(',', @substr));
            }
            print "Sending data for processing...\n";
            my @values = ip4process($devid,$header[3],$header[2],@records);
            $ip4queue->enqueue(@values);
        } else {
            sleep 1;
        }
    }
    print "thread stopped\n";
}

Как видно, программа постоянно проверяет очередь. Если есть что обрабатывать — она обрабатывает; если нет — она спит 1 секунду.
Как (может быть) вам известно, данные, полученные с NetFlow необходимо дискретизировать(то есть разбивать на интервалы). Я позаботился об этом в процедуре ip4process:
sub ip4process {
    my ($devid,$datetime,$sysuptime,@data) = @_;
    my ($start, $end, $istart, $iend, $timerange, $deltapkts, $timerange, $deltaoctets, $pkts ,$octets, $i, $j, $result, $ipkts, @values);
    foreach my $row (@data) {
        my @result = split(",",$row);
        $start = $datetime-ceil(($sysuptime - $result[7])/1000);
        $end = $datetime-ceil(($sysuptime - $result[8])/1000);
        $istart = floor($start/$interval)*$interval;
        $iend = floor($end/$interval)*$interval;
        $timerange = $end-$start;
        if ($istart == $iend) {
            push (@values,"(0,'".
            $devid."','".
            $iend."','".
            join("','", @result[0..6,9..10,12..18])."')");
        } elsif (($istart<$iend) and (floor($result[6]/$timerange)>0)) {
            $deltapkts = $result[5] / $timerange;
            $deltaoctets = $result[6] / $result[5];
            $pkts = floor(($istart+$interval-$start)*($deltapkts));
            $octets = $pkts * floor($deltaoctets);
            if ($pkts > 0) {
                push (@values, "(0,'".
                    $devid."','".
                    $istart."','".
                    join("','", @result[0..4])."','".
                    $pkts."','".
                    $octets."','".
                    join("','", @result[9..10,12..18])."')");
            }
            my ($intervals) = floor(($iend-$istart)/$interval);
            for ($i=1; $i<$intervals; $i++) {
                $ipkts += $deltapkts*$interval;
                if (floor($ipkts)>0) {
                    $octets = floor($deltaoctets)*floor($ipkts);
                    push (@values,"(0,'".
                        $devid."','".
                        ($istart+$interval*$i)."','".
                        join("','", @result[0..4])."','".
                        floor($ipkts)."','".
                        $octets."','".
                        join("','", @result[9..10,12..18])."')");
                    $pkts += floor($ipkts);
                    $ipkts -= floor($ipkts);
                }
            }
            $octets = $result[6]-floor($deltaoctets)*$pkts;
            $pkts = $result[5] - $pkts;
            if ($pkts>0) {
                push (@values, "(0,'".
                    $devid."','".
                    $iend."','".
                    join("','", @result[0..4])."','".
                    $pkts."','".
                    $octets."','".
                    join("','", @result[9..10,12..18])."')");
            }
        } else { print "bad record\n"; }
    }
    return (@values);
}

В этой процедуре описан довольно нудный процесс разбиения данных на интервалы. Может быть кому то алгоритм покажется не очень оптимальным. Извините если что не так…
После переработки данных, мы возвращаем их, готовые для записи. Однако вот ведь проблема: Как записать данные в базу? Допустим у нас 10 устройств. Использовать десять подключений? Плюс еще хочется агрегировать данные. В один интервал может попасть больше одной сессии и суммировать их в одну строку было бы очень хорошо. На помощь опять же приходит многопоточность и механизм очередей. Если вы обратите внимание, в самом начале я запустил поток в основной программе:
#init thread-writer to sql
my $ip4queue:shared = Thread::Queue->new;
my $ip4thread=threads->new(\&datatosql,$dbname,$dbhost,$dbusr,$dbpass);

а в теле процедуры v5thread мы используем метод
$ip4queue->enqueue(@values);

Смотрим, что происходит в потоке:
sub datatosql {
    my @data;
    my ($dbname,$dbhost,$dbusr,$dbpass)=@_;
    my ($db) = DBI->connect("DBI:mysql:$dbname;host=$dbhost", "$dbusr", "$dbpass") || die "Could not connect to database: $DBI::errstr";
    while (1) {
        if ($ip4queue->pending) {
            push (@data, $ip4queue->dequeue);
            if (scalar(@data) >= 512) {
                print "Achieved ".scalar(@data)." records. Writing into database.\n";
                $request = $db->prepare('SELECT COUNT(*) FROM `ip4temp`');
                $request->execute;
                if ($request->fetchrow >= 200000) {
                    $db->do("INSERT INTO `ip4graph` (`device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,`dpkts`,`doctets`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`,`src_mask`,`dst_mask`)
                            SELECT `device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,SUM(`dpkts`) AS `dpkts`,SUM(`doctets`) AS `doctets`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`,`src_mask`,`dst_mask
                            FROM `ip4temp`
                            GROUP BY `device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`");
                    $db->do("TRUNCATE `ip4temp`");
                }
                $request->finish;
                $db->do ("INSERT INTO `ip4temp` VALUES\n".join (",\n", @data)."\n");
                @data = ();
            }
        } else { sleep 5 }
    }
    $db->disconnect();
}

Здесь мы видим, что происходит обычная переработка очереди. Данные берутся по 512 строк и записываются одним INSERT-ом в таблицу. Отмечу, что таблица имеет тип MEMORY, а потому является очень быстрой. В 16 мегабайт (при стандартных настройках MySQL) умещается около 250 тысяч строк. Программа контролирует это и при достижении 200 тысяч строк записывает данные в InnoDB таблицу (MyISAM здесь будет не так хорош) с агрегацией и последующей очисткой временной базы.

Фактически это полноценный работоспособный код. Можно его скопировать в *.pl файлик и запустить, и он будет работать. Единственно вам понадобится создать базу для него. Прикладываю скрипт инициализации БД:
-- Delete old base and user
DROP DATABASE `flow`;
DROP USER 'netflow'@'localhost';

-- Create new database and use it
CREATE DATABASE IF NOT EXISTS `flow` CHARACTER SET utf16;
USE `flow`;
-- Create user and grant privileges
CREATE USER 'netflow'@'localhost' IDENTIFIED BY PASSWORD '*993AA45E0B64915AFBD1A5BE5713FD509A8E6C2C';
GRANT ALL PRIVILEGES ON `flow` . * TO 'netflow'@'localhost' WITH GRANT OPTION;
-- Create table for templates
CREATE TABLE IF NOT EXISTS `devices` (
`device_id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`device_header` VARCHAR(100),
`device_description` VARCHAR(100),
`device_data` VARCHAR(100)
) ENGINE=InnoDB;

CREATE TABLE IF NOT EXISTS `interfaces` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`device_id` INT UNSIGNED NOT NULL,
`interface_id` INT UNSIGNED NOT NULL,
`interface_direction` BOOLEAN,
`interface_description` VARCHAR(256)
) ENGINE=innodb;

CREATE TABLE IF NOT EXISTS `ip4temp` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`device_id` INT UNSIGNED NOT NULL,
`dtime` INT UNSIGNED,
`srcaddr` INT UNSIGNED,
`dstaddr` INT UNSIGNED,
`nexthop` INT UNSIGNED,
`input` SMALLINT UNSIGNED,
`output` SMALLINT UNSIGNED,
`dpkts` INT UNSIGNED,
`doctets` INT UNSIGNED,
`srcport` SMALLINT UNSIGNED,
`dstport` SMALLINT UNSIGNED,
`tcp_flags` TINYINT UNSIGNED,
`prot` TINYINT UNSIGNED,
`tos` TINYINT UNSIGNED,
`src_as` SMALLINT UNSIGNED,
`dst_as` SMALLINT UNSIGNED,
`src_mask` SMALLINT UNSIGNED,
`dst_mask` SMALLINT UNSIGNED
) ENGINE=memory;

CREATE TABLE IF NOT EXISTS `ip4graph` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`device_id` INT UNSIGNED NOT NULL,
`dtime` INT UNSIGNED,
`srcaddr` INT UNSIGNED,
`dstaddr` INT UNSIGNED,
`nexthop` INT UNSIGNED,
`input` SMALLINT UNSIGNED,
`output` SMALLINT UNSIGNED,
`dpkts` INT UNSIGNED,
`doctets` INT UNSIGNED,
`srcport` SMALLINT UNSIGNED,
`dstport` SMALLINT UNSIGNED,
`tcp_flags` TINYINT UNSIGNED,
`prot` TINYINT UNSIGNED,
`tos` TINYINT UNSIGNED,
`src_as` SMALLINT UNSIGNED,
`dst_as` SMALLINT UNSIGNED,
`src_mask` SMALLINT UNSIGNED,
`dst_mask` SMALLINT UNSIGNED
) ENGINE=InnoDB;


Вот так на коленке мы написали коллектор с анализатором.
Вероятно вы зададитесь вопросом. Как получить данные?.. Вот вам скрипт:
(SELECT
FROM_UNIXTIME( `dtime` ) AS `dtime`,
INET_NTOA( `srcaddr` ) AS `srcaddr` ,
INET_NTOA( `dstaddr` ) AS `dstaddr` ,
INET_NTOA( `nexthop` ) AS `nexthop` ,
`input` ,
`output` ,
`dpkts` ,
`doctets` ,
`srcport` ,
`dstport` ,
`tcp_flags` ,
`prot` ,
`tos` ,
`src_as` ,
`dst_as`
FROM `ip4graph`
WHERE `dstaddr` =172966137)
UNION
(SELECT
FROM_UNIXTIME( `dtime` ) AS `dtime` ,.
INET_NTOA( `srcaddr` ) AS `srcaddr` ,
INET_NTOA( `dstaddr` ) AS `dstaddr`,
INET_NTOA( `nexthop` ) AS `nexthop` ,
`input` ,
`output` ,
`dpkts` ,
`doctets` ,
`srcport` ,
`dstport` ,
`tcp_flags` ,
`prot` ,
`tos` ,
`src_as` ,
`dst_as`
FROM `ip4temp`
WHERE `dstaddr` =172966137)
ORDER BY `dtime`
LIMIT 0,30

Если вы имеете хоть немного навыков работы в MySQL — разберетесь. Если с этим проблемы — пишите — вышлю вам свою веб-морду (тоже сделанную на коленке):

Вебморду не выкладываю нигде потому что не доделано многое (формы, чарты и т.д).

Результат:
В результате я получил вполне рабочую схему коллектора и анализатора в одном флаконе.
Нагрузка либо не возросла либо возросла незначительно.
Впереди нарисовался некоторый план действий по оптимизации и расширению функционала. Базис заложен.

0 комментариев

Только зарегистрированные и авторизованные пользователи могут оставлять комментарии.