이제 Splunk에서 ELK로 쉽게 넘어 오세요. 엘프(ELP)가 도와드립니다.

Splunk를 사용하면서 좋은 점이 참 많은데, 딱 하나 가격이 너무 비쌉니다.

그래서 많은 사이트에서 ELK도입을 고려하지만 쉽지 않습니다. (ELK로 넘어갈때 여러가지 문제점)

이러한 문제점들도 ELK의 개발속도가 너무 빨라서 최근 버전에는 많은 부분이 개선되어서 실무에서 큰 무리없이 이관 작업들을 하고 있습니다.

하지만 Elastic에서 가장 크게 간과하는 부분이 있습니다. 바로 SPL입니다. Splunk를 사용하는 이유가 SPL 때문인데, 이 부분을 버리고 ELK로 가는 것은 쉽지 않은 결정입니다.

또한 Kibana 의 discover 메뉴에서 많은 정보를 얻을 수 있으나, 통계나 필터 기능정도까지 가능합니다. Splunk와 비교하면 stats, where 2개 명령어밖에 없습니다. 그래서 조금더 세부적인 분석을 하려면 별도 개발을 해야합니다.

그래서 Elastic Layer for Phantom 제품을 개발했습니다.

기 구축된 ELK 스텍의 접속 정보만 엘프(ELP)에 추가하면 바로 SPL을 사용할 수 있습니다. 예를 들면 eval, timechart 같은 명령어들을 사용할 수 있습니다.

아래는 엘프(ELP)에서 정보유출 탐지를 위해서 SPL을 사용해서 ELK에 검색하는 화면입니다.

엘프에서 사용 가능한 명령어는 2020.07.07일 기준으로 아래와 같습니다. 이 명령어들은 앞으로 지속적으로 추가 및 개선될 예정입니다.

엘프(ELP)에서 사용 가능한 SPL Command List

index

정확히 index라는 Command는 없고 index를 지정해서 검색하는 하는 기능이다. 아래처럼 인덱스를 정하고 필드 검색을 할 수 있다.
필드 검색 시 log_message=”message”일 경우 log_message가 포함되어 있는 문서를 검색한다.
그리고 log_message=”Rejected message”은 2개의 키워드가 모두 존재하는 문서를 검색한다. 즉 공백은 AND로 처리된다.
예) index=”mail_log” log_message=”Rejected message not authorized to relay to”

stats

Splunk의 stats 명령어와 유사합니다. SQLite 기능을 활용해서 개발한 것이라 SQLite에서 사용하는 함수들 대부분 지원합니다.
예1) index=”network_traffic” | stats round(avg(bytes)/1024), sum(bytes), count(*), min(bytes), max(bytes) by src
예2) index=”network_traffic” | dedup src, dpt | stats group_concat(dpt) by src
Support Function List

Type of function Supported functions and syntax
Aggregate functions sum(), avg(), min(), max(), count(), GROUP_CONCAT(expression, separator)

tstats

Support Function List

Type of function Supported functions and syntax
Aggregate functions count(), distinct_count(), dc(), mean(), avg(), median(), median_absolute_deviation(), stats(), extended_stats(), min(), max(), sum(), values(), terms()

stats() : min(), max(), sum(), count(), avg()
extended_stats() : min(), max(), sum(), count(), avg(), sum_of_squares(), variance(), std_deviation(), std_deviation_bounds_upper(), std_deviation_bounds_lower()

streamstats

Splunk의 streamstats 명령어와 유사합니다.
예) streamstats sum(bytes) by src_ip

Type of function Supported functions and syntax
Aggregate functions sum(), avg(), mean(), median(), min(), max()

timechart

예) timechart c from network_traffic by src span=”1d”

where

예) index=”network_traffic” | where src=”192.168.0.202″ | table src, dst, bytes

table

예) index=”network_traffic” | where src=”192.168.0.202″ | table src, dst, bytes

head

예) index=”network_traffic” | head 100

rename

예) tstats avg(bytes) from network_traffic by src, dst | rename avg(bytes) as avg_of_bytes

dedup

중복제거 예) index=”network_traffic” | table src, dst, bytes | dedup
예) index=”network_traffic” | table src, dst, bytes | dedup src, dst, bytes

eval

예) tstats avg(bytes) as avg_bytes from network_traffic by src | eval avg_kb=round(avg_bytes/1024)
Support Function List

Type of function Supported functions and syntax
Mathematical functions abs(X), ceiling(X), exp(X), floor(X), ln(X), log(X,Y), pi(), pow(X,Y), round(X,Y), sqrt(X)
Trigonometry and Hyperbolic functions sin(X), cos(X), tan(X), abs(X), fabs(X), sqrt(X), square(X), modf(X), sign(X)
Text functions 개발중

Operators

Type Operators
Arithmetic + – * / %
Boolean 테스트중
Concatenation 테스트중

rex

Use this command to either extract fields using regular expression named groups, or replace or substitute characters in a field using sed expressions.
The rex command matches the value of the specified field against the unanchored regular expression and extracts the named groups into fields of the corresponding names.

예) index=”mail_log” | rex field=log_message “Rejected\smessage\s-\s*(?P<source_ip>[0-9].[0-9].[0-9].[0-9])”

sort

예) tstats c from network_traffic by src | sort -c

join

Splunk에서 사용하는 join과 동일하고 현재는 LEFT OUTER JOIN만 지원합니다. INNER JOIN을 원하신다면 아래처럼 IN 을 사용한 서브쿼리를 사용해주세요. tstats max(@timestamp) as event_time, c from safe_db where size>5000 and user_id not in (## tstats c from exception_user by user_id ##) by user_id, dept_code

예) tstats max(@timestamp) as event_time, c from safe_db by user_id, dept_code | join user_id, dept_code [ index=insa_db | table user_id, user_name, dept_code, dept_name ]