Skip to main content

An Experiment with Filebeat and ELK Stack

ELK Stack is one of the best distributed systems to centralize lots of servers' logs. Filebeat is a log shipper that keeps track of the given logs and pushes them to the Logstash. Then logstash outputs these logs to elasticsearch. I am not going to explain how to install ELK Stack but experiment about sending multiple log types (document_type) using filebeat log shipper to logstash server.

So there is one server (agent) filebeat and apache http server installed on it. Agent is going to send syslogs, http access logs and http error logs to Logstash.

Here is the contents of the /etc/filebeat/filebeat.yml config file:
filebeat:
  prospectors:
    -
      paths:
        - /var/log/messages
        - /var/log/secure
      input_type: log
      document_type: syslog
    -
      paths:
        - /var/log/httpd/access_log
      input_type: log
      document_type: httpd_access_log
    -
      paths:
        - /var/log/httpd/error_log
      input_type: log
      document_type: httpd_error_log
  registry_file: /var/lib/filebeat/registry
output:
  logstash:
    hosts: ["10.1.0.11:5044"]
    bulk_max_size: 1024
    tls:
      certificate_authorities: ["/etc/pki/tls/certs/logstash-forwarder.crt"]
shipper:
logging:
  to_syslog: false
  to_files: true
  files:
    path: /var/log/mybeat
    name: mybeat
    rotateeverybytes: 10485760 # = 10MB
    keepfiles: 2
  level: info

There is one server (backend) logstash installed on it.
Here is the contents of the files located in the /etc/logstash directory:

conf.d/02-beats-input.conf
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }
}

conf.d/10-syslog-filter.conf
filter { 
  if [type] == "syslog" { 
    grok { 
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } 
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ] 
    } 
    syslog_pri { } 
    date { 
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ] 
    } 
  } else if [type] == "httpd_access_log" { 
      grok { 
        match => { "message" => "%{COMBINEDAPACHELOG}" } 
      } 
      date { 
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] 
      } 
    } else if [type] == "httpd_error_log" { 
        grok { 
          match => { "message" => "%{GREEDYDATA}" } 
        } 
      } 
}

conf.d/30-elasticsearch-output.conf
output {
  elasticsearch {
    hosts => ["10.1.0.12:9200"]
    sniffing => true
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
}


Comments

Popular posts from this blog