Logstash es una herramienta muy potente, que permite visualizar y analizar gran cantidad de información de una forma ágil y cómoda, y que está teniendo una gran aceptación en muchos ámbitos diferentes. En mi anterior entrada, que he actualizado con detalles de funcionamiento de la última versión de LogStash publicada, os contaba cómo iniciarse en el uso de esta aplicación, y empezar a probar su gran potencial.
Hoy vamos a ver otro posible uso para esta magnífica aplicación. Se trata de realizar un pequeño análisis sobre las alertas que se han generado en una sonda Snort durante un periodo determinado de tiempo.
Para ello, lo primero es disponer de las alertas en un fichero, para que la aplicación las pueda procesar. Una consulta como la siguiente nos servirá para extraer la información al fichero /tmp/saw_dump.csv:
SELECT sig_name AS sig_name, dec2ip(ip_src) AS ip_src, layer4_sport AS port_src, dec2ip(ip_dst) AS ip_dst, layer4_dport AS port_dst, ip_proto AS ip_proto, timestamp AS timestamp FROM acid_event WHERE timestamp >= 20140301 and timestamp < 20140401 INTO OUTFILE'/tmp/saw_dump_march.csv' FIELDS TERMINATED BY ';' LINES TERMINATED BY '\n' ;
El fichero creado tendrá un contenido similar al siguiente:
ET P2P BitTorrent Announce ;AA.AA.AA.AA;18301;XX.XX.XX.XX;6969;6;2014-03-01 00:00:00 ET MALWARE User-Agent (Mozilla/4.0 (compatible ICS)) ;AA.AA.BB.BB;3289; XX.XX.XX.XX;80;6;2014-03-01 00:00:00 ET MALWARE Suspicious Mozilla User-Agent - Likely Fake (Mozilla/4.0) ; AA.AA.CC.CC;62460;XX.XX.XX.XX;80;6;2014-03-01 00:00:00 ET P2P BitTorrent Announce ;AA.AA.DD.DD;18301;XX.XX.XX.XX;6969;6;2014-03-01 00:00:00 ET WEB_SERVER Outbound PHP User-Agent;AA.AA.EE.EE;53376;XX.XX.XX.XX;80;6; 2014-03-01 00:00:00
Si echamos un vistazo a los patrones que incluye por defecto vemos que, como era de esperar, no hay ninguno que coincida con la salida de nuestra consulta, por lo que tendremos que generar un patrón nuevo para poder procesar toda la información necesaria. Para ello, nos ayudaremos de la aplicación web Grok Debugger, que nos permite, a partir de un ejemplo de nuestro log, generar un patrón de forma sencilla.
La estructura de los patrones de Grok es la siguiente: %{SINTAXIS:SEMÁNTICA}. En SINTAXIS se define el nombre del patrón que se buscará (puede ser una cadena, una IP, un entero, una fecha, etcétera. En definitiva, cualquier patrón definido anteriormente), y en SEMÁNTICA qué nombre le vamos a dar en la salida a dicho parámetro. Con esto en mente, creamos los siguientes patrones para poder procesar correctamente el fichero de log que hemos generado:
SNORT_SIMPLE %{DATA:signature};%{IP:ip_src};%{POSINT:port_src}; %{IP:ip_dst};%{POSINT:port_dst};%{DATA:proto};%{TIMESTAMP:timestamp} DATE_ENG %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY} TIMESTAMP %{DATE_ENG} %{TIME}
Estos patrones personalizados los guardamos en un fichero de texto, que luego indicaremos a Logstash para que los pueda interpretar.
Antes de continuar, apuntar un pequeño detalle; hemos generado el patrón a partir de un ejemplo, así que es posible que el patrón generado no cubra todos los posibles valores de entrada con los que nos podemos encontrar, por lo que será necesario, al menos al principio, revisar la salida por consola de LogStash o Kibana en busca de elementos con el tag “_grokparsefailure”.
Es el caso del siguiente mensaje:
Aquí vemos que, al tratarse de una alerta ICMP, los valores de los puertos son nulos por lo que aparece la cadena “\N”, que no coincide con el valor de entero positivo que habíamos asignado en nuestra definición de patrón, dando lugar al fallo. En este caso, debemos modificar dicho patrón para que tenga en cuenta que es posible que los valores de los puertos sean nulos, quedando el listado como sigue:
SNORT_SIMPLE %{DATA:signature};%{IP:ip_src};(?:%{POSINT:port_src}|\N); %{IP:ip_dst};(?:%{POSINT:port_dst}|\N); %{DATA:proto};%{TIMESTAMP:timestamp} DATE_ENG %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY} TIMESTAMP %{DATE_ENG} %{TIME}
Ahora sólo nos falta configurar Logstash con el fichero a procesar, e indicarle dónde están los patrones que vamos a aplicar en este caso. Para ello, generamos un nuevo fichero de configuración (que hemos llamado logstash-snort.conf) con el siguiente contenido:
input { file { start_position => "beginning" path => [ "/home/jvila/saw_dump_march.csv" ] } } filter { grok { patterns_dir => "./patterns" match => { "message" => "%{SNORT_SIMPLE}" } } date { match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ] locale => "en" } } output { elasticsearch { embedded => true } }
Los cambios principales respecto al fichero mostrado en el artículo anterior los encontramos en la cadena que se busca en el filtro, y el formato de la fecha proporcionado por la base de datos.
Para finalizar, tan sólo debemos lanzar Logstash y esperar a que cargue toda la información necesaria, ejecutando por una parte el proceso que recopilará toda la información:
jvila@jvila:~/logstash-1.4.0$ bin/logstash -f logstash-snort.conf
Y por otra el que nos cargará la web:
jvila@jvila:~/logstash-1.4.0$ bin/logstash web
Mañana, si el editor me deja, os enseño el resultado y os explico algunas vistas que nos han parecido interesantes.
Estad atentos.
Aunque no tengo mucha idea de esto, te agradezco la molestia que te has tomado en explicarlo bien :)
Muchas gracias Alejandro, me alegra que te haya gustado el artículo.
Excelente explicacion… me queda la duda de como se podra identificar un campo si cambia de posicion, mi firewall no siempre envia los campos en el mismo orden… aun no se que voy a hacer si alguien sabe que nos comente para el proximo que pase por aqui.. gracias
La Solucion a cualquiera que utilice Logs de la forma:
“NAME1=VALOR NAME2=VALOR2…”
“NAME1=VALOR,NAME2=VALOR2,…”
Es utilizar KV en vez de GROK, KV Automaticamente sacara los campos sin importar el orden o la posicion de estos.
Se puede remplazar ‘=’ y ‘SPACE’ por cualquier caracter que queramos:
http://logstash.net/docs/1.4.2/filters/kv
A quienes se quieran evitar el trabajito de configurar campo por campo el GROK:
1. Comentar las lineas de grok
2. Adicionar en el filtro “kv { }” sin comillas, como se muestra a continuacion:
filter {
if “syslog” in [tags] {
#COMENTAMOS GROK
# grok {
# match => { “message” => “%…
# add_field => [ “received_at”, “%{@timestamp}” ]
# add_field => [ “received_from”, “%{host}” ]
# }
#FIN DE GROK
syslog_pri { }
#USAMOS KV
kv { }
#FIN KV
date {
match => [ “syslog_timest….
timezone => “America/Bogota”
}
}
Muchas gracias louis3x0 por tus comentarios.
Me alegra que hayas podido solucionar tu problema.