Traffic monitoring with Kibana, ElasticSearch and Python

Motivation

Dashboard イメージ

ElasticSearch Install

$ brew install elasticsearch
$ elasticsearch -v
Version: 1.4.4, Build: c88f77f/2015-02-19T13:05:36Z, JVM: 1.7.0_72

Download Kibana

$ wget https://download.elasticsearch.org/kibana/kibana/kibana-4.0.1-darwin-x64.tar.gz
$ tar zxvf kibana-4.0.1-darwin-x64.tar.gz

Python Library

$ pip install pyshark elasticsearch requests

Execute

$ python packet_cap_es.py <interface>

Python Script

"""
This app captures packets and extract five tupels.
Store these data to elastic search.
Elastic search and kibana creates real time packet monitering
bashbord.
"""
import json
import sys
import datetime
import time

import pyshark
import requests
from elasticsearch import Elasticsearch
from elasticsearch import helpers

URL = "http://localhost:9200"
INDEX_URL = URL + "/packets"
TYPE_URL = INDEX_URL + "/packet"
ACTION = {"_index" : "packets",
          "_type" : "packet",
          "_source": {}
         }


def delete_index():
    """Delete an index in elastic search."""
    requests.delete(INDEX_URL)


def create_index():
    """Create an index in elastic search with timestamp enabled."""
    requests.put(INDEX_URL)
    setting = {"packet" : {
                "_timestamp" : {
                    "enabled" : True,
                    "path" : "capture_timestamp",
                },
                "numeric_detection" : False,
                "properties" : {
                    "dstip" : { "type":"string",
                                "index" : "not_analyzed",
                                "store" : True},
                    "srcip" : { "type":"string",
                                "index" : "not_analyzed",
                                "store" : True}
                }
            }}
    for _ in range(1, 100):
        try:
            r = requests.put(TYPE_URL + "/_mapping", data=json.dumps(setting))
            break
        except:
            time.sleep(1)
            pass

def main():
    """Extract packets and store them to ES"""
    capture = pyshark.LiveCapture(interface=sys.argv[1])
    packet_que = list()
    es = Elasticsearch()

    end_time = None
    for packet in capture.sniff_continuously():
        if packet.transport_layer in ("UDP", "TCP"):
            try:
                # Why does ES add 9 hours automatically?
                localtime = float(packet.sniff_timestamp) - 60 * 60 * 9  # GMT + 9
                row_timestamp = datetime.datetime.fromtimestamp(localtime)
                timestamp = row_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
                version = int(packet[1].version)
                # ip v6 does not have protocol. It has next header instead.
                if version == 4:
                    protocol = int(packet[1].proto)
                elif version == 6:
                    protocol = int(packet[1].nxt)
                else:
                    protocol = None

                dstip = packet[1].dst
                srcip = packet[1].src
                dstport = int(packet[2].dstport)
                srcport = int(packet[2].srcport)
                parsed_packet = dict(version=version, protocol=protocol,
                                     dstip=dstip, srcip=srcip,
                                     dstport=dstport, srcport=srcport,
                                     capture_timestamp=timestamp)
                # For historical graph
                parsed_packet["@timestamp"] = timestamp
                action = ACTION.copy()
                action["_source"].update(parsed_packet)
                packet_que.append(action)
                current = time.time()
                while(end_time is None or current - end_time >= 3):
                    helpers.bulk(es, packet_que)
                    del packet_que[0:len(packet_que)]
                    end_time = time.time()
                    break

            except Exception as e:
                time.sleep(1)


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >>sys.stderr, "python packet_cap_es.py <interface>"
        exit(1)
    delete_index()
    create_index()
    main()

Recommended Posts

Traffic monitoring with Kibana, ElasticSearch and Python
Programming with Python and Tkinter
Python and hardware-Using RS232C with Python-
[Python] Folder monitoring with watchdog
python with pyenv and venv
Works with Python and R
Site monitoring and alert notification with AWS Lambda + Python + Slack
Communicate with FX-5204PS with Python and PyUSB
Shining life with Python and OpenCV
Robot running with Arduino and python
Install Python 2.7.9 and Python 3.4.x with pip.
Neural network with OpenCV 3 and Python 3
Scraping with Node, Ruby and Python
Scraping with Python, Selenium and Chromedriver
Scraping with Python and Beautiful Soup
ElasticSearch + Kibana + Selenium + Python for SEO
JSON encoding and decoding with python
Hadoop introduction and MapReduce with Python
[GUI with Python] PyQt5-Drag and drop-
I played with PyQt5 and Python3
Reading and writing CSV with Python
Multiple integrals with Python and Sympy
Coexistence of Python2 and 3 with CircleCI (1.0)
Easy modeling with Blender and Python
Sugoroku game and addition game with python
FM modulation and demodulation with Python
Communicate between Elixir and Python with gRPC
Data pipeline construction with Python and Luigi
Calculate and display standard weight with python
Pet monitoring with Rekognition and Raspberry pi
Monitor Mojo outages with Python and Skype
Device monitoring with On-box Python in IOS-XE
FM modulation and demodulation with Python Part 3
Python installation and package management with pip
Using Python and MeCab with Azure Databricks
POST variously with Python and receive with Flask
Capturing images with Pupil, python and OpenCV
Fractal to make and play with Python
A memo with Python2.7 and Python3 on CentOS
Use PIL and Pillow with Cygwin Python
Create and decrypt Caesar cipher with python
CentOS 6.4 with Python 2.7.3 with Apache with mod_wsgi and Django
Reading and writing JSON files with Python
Dealing with "years and months" in Python
I installed and used Numba with Python3.5
Tweet analysis with Python, Mecab and CaboCha
Linking python and JavaScript with jupyter notebook
FM modulation and demodulation with Python Part 2
Encrypt with Ruby (Rails) and decrypt with Python
Easily download mp3 / mp4 with python and youtube-dl!
Operate home appliances with Python and IRKit
"First Elasticsearch" starting with a python client
Clean python environment with pythonz and virtualenv
Practice web scraping with Python and Selenium
Easy web scraping with Python and Ruby
Importing and exporting GeoTiff images with Python
I'm using tox and Python 3.3 with Travis-CI
Happy GUI construction with electron and python
Use Python and MeCab with Azure Functions
Touch AWS with Serverless Framework and Python
RaspberryPi L Chika with Python and C #