Logstash 설명 및 간단 사용법

 

Logstash란?

  • 다양한 형태의 데이터를 동시에 수집하여 가공처리 한 이후 원하는 저장소로 전달하는 프로그램


Logstash의 3요소

  • input: 데이터를 수집하는 소스가 어디있는지(데이터가 유입되는 근원지)
  • filter: 데이터를 어떻게 가공할 것인지(데이터에 변형을 가함)
  • output: 데이터를 어디로 보낼 것인지(데이터를 전송할 목적지)


Logstash setting files

yml 파일에 직접 들어가면 관련 정보들이 주석으로 잘 설명 되어 있다.

  1. logstash.yml
    • pipeline.batch.size: 동시에 들어오는 데이터들을 몇개씩 묶어서 필터에 보낼 것인지.
    • pipeline.batch.delay: 만든 배치들을 목적지에 전송한 다음, 다음 이벤트를 기다리는 딜레이(ms단위)
    • pipeline.workers: 하나의 작업에 쓰일 CPU core의 숫자.
    • path.config: pipeline에 관한 config가 있는 위치 지정.
    • config.reload.automatic: 기본값음 false로 되어 있지만 true로 바꾸면 pipeline에 관한 yml파일이 수정될때마다 자동으로 감지해서 적용시켜준다. 이게 false면 설정값을 바꿀때마다 logstash를 재시작해야된다.
    • config.reload.interval: 몇초 간격으로 pipeline 설정값이 바뀌었는지 check하는 시간.
  2. log4j2.properties: logstash의 출력을 담당
  3. pipelines.yml: 기본적으로 logstash.yml파일로 config설정이 되지만 여러개의 pipelines이 존재할 때에는 여기에서 id에 따라 다르게 config를 설정할 수 있다.


Hello Logstash

./bin/logstash -d “input{ stdin { } } output{ stdout { } }” 실행 후 아무 글자나 타이핑 한 후 엔터치면 화면에 그대로 출력되는 것을 볼 수 있다.


Logstash conf file

실행: ./bin/logstash -f ex) ./bin/logstash -f ./conf/logstash-sample.conf 설정값을 다루는 yml파일과는 다르게 conf파일은 logstash의 메인 함수격이다. config 파일을 수정하지 않고는 logstash가 제대로 작동하지 않는다.

다음의 예제로 설명하도록 하겠다. 각 플러그인 및 자세한 내용들은 공식 홈페이지 에서 확인하면 된다.

input {
  file {
    path => "/var/log/secui-*.log"
    mode => "read"
  }
}

filter {
  if [message] =~ /\[device_allow\] \[(.*?)\](.*?),/ {
    grok {
      match => {"message" => '\[device_allow\] \[(%{DATA:device_ip})\](%{DATA:log_create_time}),(%{DATA:log_end_time}),(?:.*?),(?:.*?),(%{DATA:policy_id}),(?:.*?),(%{DATA:source_ip}),(?:.*?),(%{DATA:source_port}),(%{DATA:destination_ip}),(%{DATA:destination_port}),(%{DATA:service}),'}
    }
    mutate {
      add_field => { "action" => "Allow" }
      add_field => { "log_type" => "traffic" }
    }
  }else if [message] =~ /\[device_deny\] \[(.*?)\](.*?),/ {
    grok {
      match => {"message" => '\[device_deny\] \[(%{DATA:device_ip})\](%{DATA:log_create_time}),(%{DATA:log_end_time}),(?:.*?),(?:.*?),(%{DATA:policy_id}),(?:.*?),(%{DATA:source_ip}),(?:.*?),(%{DATA:source_port}),(%{DATA:destination_ip}),(%{DATA:destination_port}),(%{DATA:service}),'}
    }
    mutate {
      add_field => { "action" => "Deny" }
      add_field => { "log_type" => "traffic" }
    }
  }else if [message] =~ /\[audit\] \[(?<device_ip>.*?)\](?<log_create_time>.*?),(?<device_name>.*?),(?<modify_user>.*?),(?<modefy_user_ip>.*?),(?:.*?),(?:.*?),firewall apply/ {
    grok {
      match => {"message" => '\[audit\] \[(?<device_ip>.*?)\](?<log_create_time>.*?),(?<device_name>.*?),(?<modify_user>.*?),(?<modefy_user_ip>.*?),(?:.*?),(?:.*?),firewall apply'}
    }
    mutate {
      add_field => { "log_type" => "audit" }
    }
  }else{
    drop {}
  }
  mutate { remove_field => [ "@version", "host", "message"] }

  date {
    match => ["timestamp", "YYYY-MM-dd HH:mm:ss"]
    remove_field => "timestamp"
  }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch{
    hosts => ["192.168.0.151:9200"]	
    index => "%{+YYYY.MM.dd}-%{device_ip}-%{log_type}-log"
  }
}

input

  • input으로 file plugin을 사용해서 file을 불러들일 것이다.
  • path는 리눅스 기준으로 설정을 하고 파일 이름들은 secui-에 해당하는 모든 로그 파일들을 읽어들인다.
  • mode는 file plugin에서 tail과 read가 있는데 그 중에서 read를 선택할 것이다.

filter

  • if문으로 각 파일로 읽어들인 message를 선별한다.
  • grok plugin을 사용하여 DATA:지어지고 싶은 필드명 으로 정규식구조에 해당하는 것들에 각자의 field명을 붙여준다.
    • grok 구조가 없는 log data를 파싱하여서 구조화를 만들고 쿼리 가능하도록 만들 때 사용하는 플러그인 syslog logs, apache, other webserver logs, mysql logs, 사람이 만든 log등을 가공할때 적합하다.
    • dissect grok plugin과 비슷할 수 있지만 이거는 delimiter를 사용해서 data를 구조화 시킨다. 반복적이면서 delimiter로 구분이 잘 되어 있는 형태의 input에는 잘 작동하지만 다양한 가능성이 있는 input들은 grok가 더 빠르고 좋다.
  • mutate를 통해 output으로 보낼 data에 field를 추가하거나 삭제하는 등의 가공을 한다.

output

  • logstash의 화면으로 직접 보기 위해 stdout으로 출력한다.
  • 이 때 codec을 써서 루비 디버그 형식의 아웃풋으로 본다.
  • logstash는 동시에 input과 output 출력이 가능하므로 stdout뿐만 아니라 elastic search에도 같은 자료를 보낸다.
  • elastic search에 보낼때에는 bulk api를 사용해서 보내진다.
    • http로 통신을 하기 때문에 성공을 하면 200이 오고 실패하면 다른 코드로 반환된다.
    • 맵핑이 실패되어서 데이터가 loss되면 404에러를 보낸다.

logstash가 데이터를 수집하는 그 시점의 시간을 timestamp로 나오는데 @timestamp대신해서 찍어주는게 date 필터이다.

curl localhost:9600/_node?pretty로 하면 로그스태쉬의 상태 정보를 볼 수 있다.

Logstash에서 elastic search로 올릴 때 mapping 문제

Logstash에서 mapping을 해서 elastic search에 실을수는 없고 kibana dev tool혹은 curl을 통해서 elastic search에다가 미리 template을 만들어 놓고 앞으로 올라갈 index들이 자동으로 그 template에 의해 mapping이 되도록 설정하는 방법이다.

PUT /_template/secui
{
  "index_patterns" : [
    "2019.*"
  ],
  "settings" : {
    "number_of_shards" : 1,
    "number_of_replicas" : 0
  },
  "mappings": {
    "@timestamp" : {
      "type" : "date"
    },
    "actions" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "destination_ip" : {
      "type" : "ip"
    },
    "destination_port" : {
      "type" : "integer"
    },
    "device_ip" : {
      "type" : "ip"
    },
    "log_create_time" : {
      "type" : "date",
      "format" : "yyyy-MM-dd HH:mm:ss"
    }
  }
}

index_patterns에 우리가 앞으로 올릴 index 패턴을 정해주면 거기에 해당하는 index들은 자동으로 template의 설정대로 mapping혹은 setting이 된다.

여기에서 index는 elastic search에 넣을 index 이름을 의미하고 template_name은 어떤 template를 가져다 쓸 것인가를 의미하고 manage_template는 기본 true로 되어 있지만 우리가 쓰고자 하는 template을 쓸 때에는 false로 바꿔줘야한다.