Logstash 설명 및 간단 사용법
Logstash란?
- 다양한 형태의 데이터를 동시에 수집하여 가공처리 한 이후 원하는 저장소로 전달하는 프로그램
Logstash의 3요소
- input: 데이터를 수집하는 소스가 어디있는지(데이터가 유입되는 근원지)
- filter: 데이터를 어떻게 가공할 것인지(데이터에 변형을 가함)
- output: 데이터를 어디로 보낼 것인지(데이터를 전송할 목적지)
Logstash setting files
yml 파일에 직접 들어가면 관련 정보들이 주석으로 잘 설명 되어 있다.
- logstash.yml
- pipeline.batch.size: 동시에 들어오는 데이터들을 몇개씩 묶어서 필터에 보낼 것인지.
- pipeline.batch.delay: 만든 배치들을 목적지에 전송한 다음, 다음 이벤트를 기다리는 딜레이(ms단위)
- pipeline.workers: 하나의 작업에 쓰일 CPU core의 숫자.
- path.config: pipeline에 관한 config가 있는 위치 지정.
- config.reload.automatic: 기본값음 false로 되어 있지만 true로 바꾸면 pipeline에 관한 yml파일이 수정될때마다 자동으로 감지해서 적용시켜준다. 이게 false면 설정값을 바꿀때마다 logstash를 재시작해야된다.
- config.reload.interval: 몇초 간격으로 pipeline 설정값이 바뀌었는지 check하는 시간.
- log4j2.properties: logstash의 출력을 담당
- pipelines.yml: 기본적으로 logstash.yml파일로 config설정이 되지만 여러개의 pipelines이 존재할 때에는 여기에서 id에 따라 다르게 config를 설정할 수 있다.
Hello Logstash
./bin/logstash -d “input{ stdin { } } output{ stdout { } }” 실행 후 아무 글자나 타이핑 한 후 엔터치면 화면에 그대로 출력되는 것을 볼 수 있다.
Logstash conf file
실행: ./bin/logstash -f
다음의 예제로 설명하도록 하겠다. 각 플러그인 및 자세한 내용들은 공식 홈페이지 에서 확인하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
input {
file {
path => "/var/log/secui-*.log"
mode => "read"
}
}
filter {
if [message] =~ /\[device_allow\] \[(.*?)\](.*?),/ {
grok {
match => {"message" => '\[device_allow\] \[(%{DATA:device_ip})\](%{DATA:log_create_time}),(%{DATA:log_end_time}),(?:.*?),(?:.*?),(%{DATA:policy_id}),(?:.*?),(%{DATA:source_ip}),(?:.*?),(%{DATA:source_port}),(%{DATA:destination_ip}),(%{DATA:destination_port}),(%{DATA:service}),'}
}
mutate {
add_field => { "action" => "Allow" }
add_field => { "log_type" => "traffic" }
}
}else if [message] =~ /\[device_deny\] \[(.*?)\](.*?),/ {
grok {
match => {"message" => '\[device_deny\] \[(%{DATA:device_ip})\](%{DATA:log_create_time}),(%{DATA:log_end_time}),(?:.*?),(?:.*?),(%{DATA:policy_id}),(?:.*?),(%{DATA:source_ip}),(?:.*?),(%{DATA:source_port}),(%{DATA:destination_ip}),(%{DATA:destination_port}),(%{DATA:service}),'}
}
mutate {
add_field => { "action" => "Deny" }
add_field => { "log_type" => "traffic" }
}
}else if [message] =~ /\[audit\] \[(?<device_ip>.*?)\](?<log_create_time>.*?),(?<device_name>.*?),(?<modify_user>.*?),(?<modefy_user_ip>.*?),(?:.*?),(?:.*?),firewall apply/ {
grok {
match => {"message" => '\[audit\] \[(?<device_ip>.*?)\](?<log_create_time>.*?),(?<device_name>.*?),(?<modify_user>.*?),(?<modefy_user_ip>.*?),(?:.*?),(?:.*?),firewall apply'}
}
mutate {
add_field => { "log_type" => "audit" }
}
}else{
drop {}
}
mutate { remove_field => [ "@version", "host", "message"] }
date {
match => ["timestamp", "YYYY-MM-dd HH:mm:ss"]
remove_field => "timestamp"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch{
hosts => ["192.168.0.151:9200"]
index => "%{+YYYY.MM.dd}-%{device_ip}-%{log_type}-log"
}
}
input
- input으로 file plugin을 사용해서 file을 불러들일 것이다.
- path는 리눅스 기준으로 설정을 하고 파일 이름들은 secui-에 해당하는 모든 로그 파일들을 읽어들인다.
- mode는 file plugin에서 tail과 read가 있는데 그 중에서 read를 선택할 것이다.
filter
- if문으로 각 파일로 읽어들인 message를 선별한다.
- grok plugin을 사용하여 DATA:지어지고 싶은 필드명 으로 정규식구조에 해당하는 것들에 각자의 field명을 붙여준다.
- grok 구조가 없는 log data를 파싱하여서 구조화를 만들고 쿼리 가능하도록 만들 때 사용하는 플러그인 syslog logs, apache, other webserver logs, mysql logs, 사람이 만든 log등을 가공할때 적합하다.
- dissect grok plugin과 비슷할 수 있지만 이거는 delimiter를 사용해서 data를 구조화 시킨다. 반복적이면서 delimiter로 구분이 잘 되어 있는 형태의 input에는 잘 작동하지만 다양한 가능성이 있는 input들은 grok가 더 빠르고 좋다.
- mutate를 통해 output으로 보낼 data에 field를 추가하거나 삭제하는 등의 가공을 한다.
output
- logstash의 화면으로 직접 보기 위해 stdout으로 출력한다.
- 이 때 codec을 써서 루비 디버그 형식의 아웃풋으로 본다.
- logstash는 동시에 input과 output 출력이 가능하므로 stdout뿐만 아니라 elastic search에도 같은 자료를 보낸다.
- elastic search에 보낼때에는 bulk api를 사용해서 보내진다.
- http로 통신을 하기 때문에 성공을 하면 200이 오고 실패하면 다른 코드로 반환된다.
- 맵핑이 실패되어서 데이터가 loss되면 404에러를 보낸다.
logstash가 데이터를 수집하는 그 시점의 시간을 timestamp로 나오는데 @timestamp대신해서 찍어주는게 date 필터이다.
curl localhost:9600/_node?pretty로 하면 로그스태쉬의 상태 정보를 볼 수 있다.
Logstash에서 elastic search로 올릴 때 mapping 문제
Logstash에서 mapping을 해서 elastic search에 실을수는 없고 kibana dev tool혹은 curl을 통해서 elastic search에다가 미리 template을 만들어 놓고 앞으로 올라갈 index들이 자동으로 그 template에 의해 mapping이 되도록 설정하는 방법이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
PUT /_template/secui
{
"index_patterns" : [
"2019.*"
],
"settings" : {
"number_of_shards" : 1,
"number_of_replicas" : 0
},
"mappings": {
"@timestamp" : {
"type" : "date"
},
"actions" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"destination_ip" : {
"type" : "ip"
},
"destination_port" : {
"type" : "integer"
},
"device_ip" : {
"type" : "ip"
},
"log_create_time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
}
}
}
index_patterns에 우리가 앞으로 올릴 index 패턴을 정해주면 거기에 해당하는 index들은 자동으로 template의 설정대로 mapping혹은 setting이 된다.
여기에서 index는 elastic search에 넣을 index 이름을 의미하고 template_name은 어떤 template를 가져다 쓸 것인가를 의미하고 manage_template는 기본 true로 되어 있지만 우리가 쓰고자 하는 template을 쓸 때에는 false로 바꿔줘야한다.