Netflow-искания Часть 4.2 (Анализ данных)

Блог им. dreamhunter
Налепил я скрипт, который что-то где-то собирает и вот встает вопрос: что делать дальше? Ну собрал ты данные — молодец. Однако необходимо их как то представлять?
На самом деле тут должен явиться мега-супер спец по SQL. Но пока такового нет, я вам кое что покажу…

Задача: Отобразить данные, собраные с шайтан-машина Cisco router, шайтан-скриптом dreamhunter-a.

Предварительный обзор

Вернемся к структуре БД. Данные по NetFlow v5 хранятся в одной таблице. Поле device_id дает нам разграничение по устройствам. IP устройств хранятся в таблице devices в шеснадцатеричном виде. Для NetFlow v9 все намного сложней.
Во-первых данные хранятся в упакованном (бинарном) виде Это связано с тем, что коллектор не может разбивать данные по типам (на самом деле может, но функция отключена т.к. решение не принято), а просто использует что-то вроде «штампа» в преобразовании.
Во-вторых, как я говорил, все зависит от устройства, передающего данные. Например роутеры в шаблоне 0x100 передают практически ту же структуру данных по протоколу IPv4, что и в 5 версии с незначительными изменениями. Попробум их извлечь. Даже я — далеко не сведующий специалист в SQL смог нарыть многое.

Попробуем получить данные

По 5 версии вообще все просто:
SELECT
FROM_UNIXTIME(`datetime`-CEIL((`sysuptime`-`first`)/1000)) AS first,
1000 - MOD(`sysuptime`-`first`, 1000) AS firstms,
FROM_UNIXTIME(`datetime`-CEIL((`sysuptime`-`last`)/1000)) AS last,
1000 - MOD(`sysuptime`-`last`, 1000) AS lastms,
INET_NTOA(`srcaddr`) as `srcaddr`,
INET_NTOA(`dstaddr`) as `dstaddr`,
INET_NTOA(`nexthop`) as `nexthop`,
`input`,
`output`,
`dpkts`,
`doctets`,
`srcport`,
`dstport`,
`tcp_flags`,
`prot`,
`tos`,
`src_as`,
`dst_as`,
`src_mask`,
`dst_mask`,
FROM `v5` WHERE `device_id`=1
LIMIT 0, 30

Вот мы и получили данные в удобном для нас виде. Проще некуда. Перейдем к девятой версии. Опять же вооружаемся таблицей соответствия и делаем аналогичный скрипт, но с распаковкой данных:
SELECT
 FROM_UNIXTIME(`datetime` - CEIL(`uptime` - CONV(HEX(`22`),16,10))/1000) as FIRST_SWITCHED,
 FROM_UNIXTIME(`datetime` - CEIL(`uptime` - CONV(HEX(`21`),16,10))/1000) as LAST_SWITCHED,
 1000 - MOD(`uptime` - CONV(HEX(`22`),16,10),1000) as FIRST_MS,
 1000 - MOD(`uptime` - CONV(HEX(`21`),16,10),1000) as LAST_MS,
 CONV(HEX(`1`),16,10) as BYTES,
 CONV(HEX(`2`),16,10) as PKTS,
 CONV(HEX(`10`),16,10) as IN_IF,
 CONV(HEX(`14`),16,10) as OUT_IF,
 INET_NTOA(CONV(HEX(`8`),16,10)) as SRC_ADDR,
 INET_NTOA(CONV(HEX(`12`),16,10)) as DST_ADDR,
 CONV(HEX(`4`),16,10) as PROT,
 CONV(HEX(`7`),16,10) as SRC_PORT,
 CONV(HEX(`11`),16,10) as DST_PORT,
 INET_NTOA(CONV(HEX(`15`),16,10)) as NEXT_HOP,
 CONV(HEX(`5`),16,10) as TOS,
 CONV(HEX(`6`),16,10) as TCP_FLAGS
FROM `3232244225256`
WHERE (CONV(HEX(`61`),16,10) = '1')
 AND FROM_UNIXTIME(`datetime` - CEIL(`uptime` - CONV(HEX(`22`),16,10))/1000) >= DATE_SUB(NOW(), INTERVAL 1 MINUTE)
ORDER BY FIRST_SWITCHED, FIRST_MS

Имя таблицы 3232244225256 состоит из 2 частей. 3232244225 — IP адрес устройства в десятичном виде; 256 — номер шаблона (0x100). У Cisco router в этом шаблоне обычно передаются данные по IPv4. Этот скрипт вам отобразит данные из таблицы по исходящему трафику (CONV(HEX(`61`),16,10) = '1') за последнюю минуту FROM_UNIXTIME(`datetime` — CEIL(`uptime` — CONV(HEX(`22`),16,10))/1000) >= DATE_SUB(NOW(), INTERVAL 1 MINUTE).

Конечно этот скрипт весьма прожорливый. И выборка делается через операцию преобразования. Я общался с одним грамотным человеком и мы сошлись на мысли о том, что правильно будет держать вторую таблицу с распакованными данными за какой то меньший период, а основные данные могли бы храниться в упакованном виде.

По упакованным данным: Число, в упакованном виде занимает столько же данных сколько занимало бы в аналогичном целочисленном (даже на байт больше). Varchar(4)=INT. Однако есть опасность получения данных, даже больше чем DECIMAL.

Дополнительные примеры

Топ 50 ресурсов за последние 10 минут:
SELECT
 INET_NTOA(CONV(HEX(`8`),16,10)) as SRC_ADDR,
 SUM(CONV(HEX(`1`),16,10)) as BYTES,
 SUM(CONV(HEX(`2`),16,10)) as PKTS
FROM `3232244225256`
WHERE
 FROM_UNIXTIME(`datetime` - CEIL(`uptime` - CONV(HEX(`22`),16,10))/1000) >= DATE_SUB(NOW(), INTERVAL 10 MINUTE)
GROUP BY SRC_ADDR
ORDER BY BYTES DESC
LIMIT 0, 50

Топ 50 качальщиков за последние 10 минут:
SELECT
 INET_NTOA(CONV(HEX(`12`),16,10)) as DST_ADDR,
 SUM(CONV(HEX(`1`),16,10)) as BYTES,
 SUM(CONV(HEX(`2`),16,10)) as PKTS
FROM `3232244225256`
WHERE
 FROM_UNIXTIME(`datetime` - CEIL(`uptime` - CONV(HEX(`22`),16,10))/1000) >= DATE_SUB(NOW(), INTERVAL 10 MINUTE)
GROUP BY DST_ADDR
ORDER BY BYTES DESC
LIMIT 0, 50


Топ приложений по входящему трафику:
SELECT
 INET_NTOA(CONV(HEX(`12`),16,10)) as DST_ADDR,
 CONV(HEX(`11`),16,10) as DST_PORT,
 SUM(CONV(HEX(`1`),16,10)) as BYTES,
 SUM(CONV(HEX(`2`),16,10)) as PKTS
FROM `3232244225256`
WHERE
 FROM_UNIXTIME(`datetime` - CEIL(`uptime` - CONV(HEX(`22`),16,10))/1000) >= DATE_SUB(NOW(), INTERVAL 10 MINUTE)
 AND CONV(HEX(`4`),16,10) = 6
GROUP BY DST_PORT, DST_ADDR
ORDER BY BYTES DESC
LIMIT 0, 50

Топ приложений по исходящему трафику:
SELECT
 INET_NTOA(CONV(HEX(`12`),16,10)) as SRC_ADDR,
 CONV(HEX(`11`),16,10) as SRC_PORT,
 SUM(CONV(HEX(`1`),16,10)) as BYTES,
 SUM(CONV(HEX(`2`),16,10)) as PKTS
FROM `3232244225256`
WHERE
 FROM_UNIXTIME(`datetime` - CEIL(`uptime` - CONV(HEX(`22`),16,10))/1000) >= DATE_SUB(NOW(), INTERVAL 10 MINUTE)
 AND CONV(HEX(`4`),16,10) = 6
GROUP BY SRC_PORT, SRC_ADDR
ORDER BY BYTES DESC
LIMIT 0, 50


Ну вот примерно и все. Жду вопросов в комментариях. Для себя я еще наметил некоторые направления. Возможно в дальнейшем все будет реализовано.

24 комментария

avatar
хмм? сайт раскрутился чтоль? смотрю реклама пошла.
avatar
Это моя контора, свои же услуги и рекламирую :)
avatar
в моих статьях… (
avatar
Если по переходу с вашей статьи будет заказ, я готов поделиться 5% прибыли.
avatar
Конечно оставить в базе сырые данные удобнее для будущего верчения во все стороны. Но место съест много. Дальнейшая разработка в какую сторону двинется?

Я пишу проект по анализу трафика пользователей по данным netflow. Я решил сразу эти данные анализировать и писать в базу. Да не так гибко. У меня пока только одна проблема — коллектор. Мне приходится использовать flow-tools: из формата flow-tools, конвертировать данные в csv, а уже потом анализировать. Тут два пути: а) анализировать flow файлы flow-tools своими силами; б) писать свой коллектор.

Кстати. flow-tools пишет netflow данные с своем формате — не встречал ли в сети спецификацию?? В рассылке по flow-tools автор указывает, что использовал API библиотеки ftlib. И все. Но разобрать как она работает у меня не получилось, C не знаю. Нашел статью aleph-nought.blogspot.ru/2009/01/format-of-flow-capture-flow-file.html, но разбор не детальный, автор пишет явно для тех кто сечет в С.
avatar
ты предыдущую часть читал? зачем тебе flow-tools?
avatar
Я не хочу хранить необработанные данные в базе. flow-tools нужен пока только как вынужденное звено. Как только будет сформирован рабочий вариант анализатора примусь за написание коллектора. Распылять силы не хочется.
avatar
Хех, а во flow-tools они обработанные? =)
Ну и почему ты говоришь что у меня они не обработанные, если они извлекаются обычным SQL скриптом? тебе какой еще вид то надо? =)
avatar
Я говорил про то, что мне не хочется хранить сырые данные. Я их обрабатываю и кладу в базу. Так занимает меньше места. Может в этом непонятка вышла. Например для локального IP 192.168.42.22 ведется подсчет какой трафик был, какие протоколы использовались, какие порты были задействованы, сколько пакетов принято и отправлено, с какими удаленными хостами была связь и о каждом хосте, опять же, есть статистика. Все это суммируется по каждому пункту и пишется в базу. Но хранить все данные я не хочу, потому как через какое-то время этих данных будет очень много и на каждый запрос будет уходить все больше времени. Но признаю, что в сыром виде эти данные можно крутить как угодно в любой момент.
avatar
Плюс я тебе расскажу такую тему:
Данные NetFlow могут прийти на коллектор в любой момент. Обработка данных с одного коллектора занимает N времени. Если ты используешь свой домашний компьютер как сенсор — этого как бы будет хватать. Но если ты работаешь в серьезной организации, то у тебя начнутся следующие проблемы:
Данные с одного из сенсоров могут прийти в тот момент, когда твой коллектор обрабатывает данные другого. В этом случае ты будешь просто терять данные. Поверь это реальность в сетях с количеством пользователей больше 500 и сенсорами Cisco ASA.

Вот тебе и выбор: делать заточенный под определенные нужды коллектор либо попытаться сделать что-нибудь по-производительней, переложив функции анализа данных на анализаторы.
avatar
Для подобных вещей есть очередь.

Но в каком виде это будет я еще не решил. Может быть проще использовать тот же flow-tools и только разобрать его формат файлов. Че изобретать велосипед. Время подумать еще есть )
avatar
вообще то не очередь, а многопоточность. UDP протокол, по которому передаются данные, ждать не будет.
по FlowTools — этот проект не развивается. Посмотри, когда была сделана последняя версия.
avatar
Ну что делать, но flow-tools работает стабильно. Но мне уже хочется написать свой коллектор ))
avatar
Дерзай-дерзай…
avatar
Спешл для Костяна:
CREATE TABLE IF NOT EXISTS `ip4report` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`device_id` INT UNSIGNED NOT NULL,
`first` DATETIME NOT NULL,
`firstms` SMALLINT UNSIGNED NOT NULL,
`last` DATETIME NOT NULL,
`lastms` SMALLINT UNSIGNED NOT NULL,
`srcaddr` INT UNSIGNED,
`dstaddr` INT UNSIGNED,
`nexthop` INT UNSIGNED,
`input` SMALLINT UNSIGNED,
`output` SMALLINT UNSIGNED,
`dpkts` INT UNSIGNED,
`doctets` INT UNSIGNED,
`srcport` SMALLINT UNSIGNED,
`dstport` SMALLINT UNSIGNED,
`tcp_flags` TINYINT UNSIGNED,
`prot` TINYINT UNSIGNED,
`tos` TINYINT UNSIGNED,
`src_as` SMALLINT UNSIGNED,
`dst_as` SMALLINT UNSIGNED
) ENGINE=InnoDB;

delimiter |

CREATE TRIGGER `v5toip4report` AFTER INSERT ON `v5`
FOR EACH ROW BEGIN
INSERT INTO `ip4report` VALUES (
0,
NEW.`device_id`,
FROM_UNIXTIME(NEW.`datetime`-CEIL((NEW.`sysuptime`-NEW.`first`)/1000)),
1000 — MOD(NEW.`sysuptime`-NEW.`first`, 1000),
FROM_UNIXTIME(NEW.`datetime`-CEIL((NEW.`sysuptime`-NEW.`last`)/1000)),
1000 — MOD(NEW.`sysuptime`-NEW.`last`, 1000),
NEW.`srcaddr`,
NEW.`dstaddr`,
NEW.`nexthop`,
NEW.`input`,
NEW.`output`,
NEW.`dpkts`,
NEW.`doctets`,
NEW.`srcport`,
NEW.`dstport`,
NEW.`tcp_flags`,
NEW.`prot`,
NEW.`tos`,
NEW.`src_as`,
NEW.`dst_as`);
END;
|
Если не сможешь этим воспользоваться — бросай программирование!!!
avatar
Искал инфу и заметил сообщение. Тема конечно интересная и пригодится для автоматизации. Вполне реально рутину переложить на базу. Вижу что в работе с базой у тебя больше опыта…
avatar
Триггирами я бы не советал пользоваться. Если только на небольших объемах трафика, и этот объем в перспективе не вырастет.
avatar
проверено, все ок. Как говорит мой коллега: трафик NetFlow, хоть и довольно большой, но зато бежит тонкой струйкой.
Однако конечно, где можно обойтись без триггеров — лучше обойтись без них.