Logs with Filebeat, Logstash, Elasticsearch and Kibana

Complex systems require monitoring. An important part of this are the log files. The best way to access, search and view multiple log files is the combination of Filebeat, Logstash, Elasticsearch and Kibana.

However, configuring them can be difficult. Here I am demonstrating a possible setup.

First, some words about each of them, in case you don't know them:

Filebeat is a tool, that watches for file system changes and uploads the file contents to a destination (output). Elasticsearch and Logstash are the most commonly used, Kafka and many others are also supported.

Logstash is a tool for beautifying the logs. It is based on the input-filter-output model. It can convert the log files into a different format, it can add and remove fields, etc.

Elasticsearch is a very famous search engine, based on Lucene. Stores documents inside indices.

Kibana is basically the GUI of Elasticsearch. It provides a user interface for searching and displaying data from indices and managing them.


What to log

In this setup I am demonstrating how to handle the system logs (syslog and auth log). The same can be done for other logs, like nginx, mysql, apache2, etc. and application logs.


Elastic search configuration

Note: all configurations and examples here are based on ELK 7.x. Some functionalities are not available in previous versions. If you are using an older version, consider upgrading.

The "index rollover" topic was covered in a previous post. I will skip the detailed explanation here.

We will upload the logs to the index alias /system

Create the lifecycle police

The lifecycle below will rollover the index after 1 day or when it gets bigger than 500MB. Additionally it will delete it completely one day after the rollover. This way we keep only the last 2 indices.

PUT _ilm/policy/logs-lifecycle
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "500mb",
            "max_age": "1d"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "delete": {
        "min_age": "1d",
        "actions": {
          "delete": {
            "delete_searchable_snapshot": true
          }
        }
      }
    }
  }
}


Create the index template

PUT _index_template/logstash-logs-system
{
  "index_patterns": ["system-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "index": {
        "lifecycle": {
          "name": "logs-lifecycle",
          "rollover_alias": "system"
        }
      }
    }
  }
}


Create the first index and alias

PUT /system-000001
{
  "aliases": {
    "system": {
      "is_write_index" : true
    }
  }
}


Filebeat configuration

Here we configure filebeat to upload the system logs to logstash, when they are changed.

Under /etc/filebeat/modules.d/ you can find multiple module configurations. 

Rename "system.yml.disabled" to "system.yml" to enable it. Its contents should look like this:

- module: system
  # Syslog
  syslog:
    enabled: true
  # Authorization logs
  auth:
    enabled: true


In order to upload them to logstash, in /etc/filebeat/filebeat.yml configure the output logstash and comment out others:

output.logstash:
  hosts: ["logstash_host:5044"]


Logstash configuration

Logstash identifies the logs as system logs, adds some fields and uploads them to the index alias in ElasticSearch.

Note that logstash processes the configurations based on the filenames in alphabetical order. In order to create priority, we can use numbers in the beginning of the filename.

First, the input - under /etc/logstash/conf.d/01-beats-input.conf

input {
  beats {
    port => 5044
  }
}


Second, the filter - under /etc/logstash/conf.d/10-syslog-filter.conf

This will convert the log line to JSON and will add a property "log_type" to it with value "system".

filter {
  if [event][module] == "system" {
    mutate {
      add_field => { "log_type" => "system" }
    }
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"
        }
        remove_field => "message"
      }
      date {
        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
      geoip {
        source => "[system][auth][ssh][ip]"
        target => "[system][auth][ssh][geoip]"
      }
    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        remove_field => "message"
      }
      date {
        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
    }
  }
}


Finally, the output - under /etc/logstash/conf.d/20-elasticsearch-output.conf

The following will send the json log line to elasticsearch. The index/alias name is the value of the field "log_type", e.g. /system.

output {
  elasticsearch {
    hosts => ["elastic_host:9200"]
    sniffing => true
    manage_template => false
    index => "%{[log_type]}"
  }
}


Kibana

Index pattern: system-**

When viewing the index, you can filter by "fileset.name" (e.g. "auth" for the auth logs).

I also recommend adding the following columns to the view: host.name, system.auth.message

Finally, you can save the search to access it easier later.

It should look like this: