程式的道路上.....我跌跌撞撞的過程
文章為我平時發生錯誤的解決方法
因為擔心下次碰到時會忘記
所以就將他筆記下來了
如果有大大發現我的敘述錯誤
或者有哪些更有效率的方法
也請大大們不吝嗇的提供指教
謝謝~
說明
該篇文章介紹在Docker快速安裝Logstash,Elasticsearch,Kibana,並將三個容器串上關聯,最終目的是監控指定的資料夾的Log,然後讓Log在頁面上呈現.
安裝環境
- 作業系統:Red Hat Enterprise Linux Server release 7.3
(基本上實體平台不影響任何問題,只要能在實體平台上安裝好docker,並確保能使用就好) - Docker版本:17.04.0-ce
- Docker Compose版本:1.10.0
安裝套件
開始安裝E.L.K
- 複製 github 專案 (https://github.com/deviantony/docker-elk.git)
echo 262144 > /proc/sys/vm/max_map_count
- 增加記憶體大小 (至少要大於262144,否則建立好的elasticsearch容器會無法啟動)
git clone https://github.com/deviantony/docker-elk.git
- Permissive mode 設定 (需再clone的路徑下執行這一行指令)
chcon -R system_u:object_r:admin_home_t:s0 docker-elk/
- 進入專案
cd docker-elk
- 建立E.L.K容器
docker-compose會讀取docker-compose.yml檔案,所以才需要先進入docker-elk專案底下
docker-compose up -d
- 接著檢查 elasticsearch service 是否啟動,於瀏覽器輸入↓
http://127.0.0.1:9200
- 接著檢查 Kibana service 是否啟動,於瀏覽器輸入↓(可能需要等個1-2分鐘才能看見該頁面)
http://127.0.0.1:5601
以上完成E.L.K建置
E.L.K 介紹
E.L.K其實是由三個工具所組成,分別是Elasticsearch,Logstash,Kibana,就自己理解的架構圖如下方的圖表,將「各台機器」或者「機器群組」底下的Log,透過Logstash蒐集起來,然後存放置Elasticsearch底下(可以將Elasticsearch想像成資料庫就比較好理解),最後當使用者需要查看Log日誌時,在透過Kibana將Log日誌呈現在網頁上提供瀏覽.
- 在開始學習怎麼操作Logstash之前,需要先編輯一下docker-composer.yml的內容
vi docker-elk/docker-compose.yml
- docker-compose.yml 新內容 :
version: '2'
services:
elasticsearch:
image: elasticsearch:latest
container_name: docker_elasticsearch
restart: always
ports:
- "9200:9200" - "9300:9300"
environment:
ES_JAVA_OPTS: "-Xmx256m -Xms256m"
xpack.security.enabled: "false"
xpack.monitoring.enabled: "false"
xpack.graph.enabled: "false"
xpack.watcher.enabled: "false"
logstash:
image: logstash:latest
container_name: docker_logstash
command: logstash -f /usr/share/logstash/config/logstash.conf
restart: always
volumes:
- /home/newcasino/docker-elk/logstash/config:/usr/share/logstash/config
- ./logstash/pipeline:/usr/share/logstash/pipeline
- /opt/splunk/ELK/API:/home/API
- /opt/splunk/ELK/DAO:/home/DAO
ports:
- "5000:5000"
environment:
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
depends_on:
- elasticsearch
kibana:
image: kibana:latest
container_name: docker_kibana
restart: always
volumes:
- ./kibana/config/:/usr/share/kibana/config
ports:
- "5601:5601"
depends_on:
- elasticsearch
請仔細看docker-compose.yml的內容,第36,38,39行的部份,這邊我分別掛載了「/config」,「/API」,「/DAO」這三個資料夾,掛載「/config」的資料夾是因為要執行該資料夾底下的「logstash.conf」檔案,透過這隻檔案可以去蒐集Log,並且切割Log欄位還有指定特定索引。
Key名稱 | 說明 | 舉例值 |
---|---|---|
container_name | 自訂Container名稱 | dev_phpfpm, dev_nginx, dev_mysql |
restart | 是否自動重新啟動 | always |
image | 指定安裝的容器 (若容器的印象檔不存在,會從Docker Hub自動取得) |
nginx:1.9.6, myphp:5.6-fpm |
build | 讀取Dockerfile檔案中指定的印象檔 | FROM Ubuntu:16.04 (以Ubuntu:16.04版本為基底安裝) |
ports | 使用的連接port (可多個) | - “80:80” (本機port:容器port) |
links | 連接其他容器 (可多個) | - dev_mysql:db (本機容器名稱:在容器中的名稱) |
volumes | 於容器中掛載本機檔案或資料夾 (可多個) | /tmp/laravel_project:/var/project (本機檔案:容器中的檔案) |
working_dir | 工作目錄 (預設執行指令的目錄) | /var/project |
depends_on | 指定先開啟的容器順序+連接其他容器 (可多個) | - elasticsearch |
command | 容器建立好後,會執行的指令 | service apache start |
初學Logstash,可以先搞懂這三個配置就可以了,分別是「input」,「filter」,「output」
名稱 | 功能 | 範例 |
---|---|---|
input | 輸入資料 | 將資料放入到Logstash |
filter | 負責處理資料 | 欄位切割… |
output | 輸出資料 | 輸出到Elasticsearch |
- 建立logstash.conf,並設置input & output的方式
vi docker-elk/logstash/config/logstash.conf
- logstash.conf內容 :
input {
file{
#放置Log的絕對路徑(監控的目錄)
path => ["/home/API/*.log"]
#Log型態(可自行定義,稍後會使用到)
type => "API"
#從檔案的第一筆開始讀取(說明該檔案的Log須從頭開始讀)
start_position => "beginning"
}
}
output {
#輸出至Elasticsearch的 9200 Port
elasticsearch {
hosts => "elasticsearch:9200"
}
}
- 重新建立新的容器(因為有更改過docker-compose.yml 內容 ,所以需重新建立新的容器)
docker-compose up -d
以上三個步驟就完成Logstash對「/home/API/ 」這個路徑底下的所有檔案的log進行監控,將要監控的Log放置該資料夾底下(API)後,建Kibana建立好預設的索引後,就可以看到Log囉~
Logstash資料夾監控完成!!
Log filter介紹 :欄位切割(基本版)
若只考量讓Log顯在在Kibana的UI頁面上,其實只需要在logstash.conf輸入如下方的文件
「path=> ["/home/API/*.log"]」是指要監控「/home/API/ 」該路徑底下任何為.log格式的檔案
input {
file{
path => ["/home/API/*.log"]
type => "API"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
但是我想會想要使用E.L.K做日誌監控,應該不會只想將Log放置到Kibana的UI頁面上,就這樣結束對吧,所以現在要開始介紹透過「filter」做欄位切割。
Filter在Logstash是不可少的重要配置之一,除了提供過濾資料的功能,他還支援了幾個插件進行複雜的邏輯處理,甚至可以無中生有的添加新的logstash 事件到後續的流程中,準備開始透過Filter底下的gork插件切割欄位,另外可以在grok 裡定義好正则表達式,在稍后可以引用它。
如下方的切割方式「?<request_time>」,將欄位命名為「request_time」
\s+(?<request_time>\d+(?:\.\d+)?)\s+
下方舉個例子,假設有一筆Log紀錄如下:
[2017-05-10 19:52:42.863] ,ClientIP:123.456.789.11 ,SeverIP:124.362.251.52 ,ExampleApi ,getGame ,message:['message':'Hello World', 'data': [2, 5, 7]] , Operator:Neil
filter的寫法可以如下
filter {
grok {
match => ["message", "^\[(?<Date>[^\]]+)\]\s\W\w+\W(?<Client_IP>(\d+.\d+.\d+.\d+))\s,\w+\W(?<Sever_IP>(\d+.\d+.\d+.\d+))\s\W(?<Class>[^\,]+),(?<Method>[^\s]+)\s\W\w+\W(?<Message>[^\O]+)\w+\W(?<Operator>[^\s]+)"]
}
}
Log欄位切割(進階版)
如果需要監控超過兩個資料夾的Log,且兩種Log的切割欄位的正則式都不相同,那這個時候就適合下面的方式,舉例同時監控「/home/API/ 」+「/home/DAO/ 」這兩個資料夾
這個時候在「input」使用到的「type」就派上用場了,請看如下方的撰寫方式 :
input {
file{
path => ["/home/API/*.log"]
type => "API"
start_position => "beginning"
}
file{
path => ["/home/DAO/*.log"]
type => "DAO"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
上方的撰寫方式,有兩個「file」分別對home底下的「API」與「DAO」的資料夾進行監控,但是到目前為止都還沒使用到的「type」這是後就派上用場了,在這邊先指定type分別為「API」,「DAO」,接著在「filter」將會呼叫這兩個type,因為我們即將切割兩種不同撰寫方式的正則式
- 「filter」撰寫方式如下
filter {
if [type] == "API" {
grok {
match => ["message", "^\[(?<Date>[^\]]+)\]\s(?<Level>[^\s]+)\s,\w+\W(?<Client_IP>[^\s]+)\s,\w+\W(?<Server_IP>[^\s]+)\s,(\w|\s)+\W(?<Process_ID>[^\,]+),(?<Class>[^\,]+),(?<Method>[^\,]+),(\w|\s)+\W\s(?<Interface>[^\,]+),(\w|\s)+\W\s(?<Host_IP>[^\,]+),(\w|\s)+\W(?<Execution_Time>[^m]+)\w+,\w+(\W|\s).(?<Param>[^\R]+),\w+\W(?<result>.+),(\s|Domain)+\W(?<Hall_ID>\d+),(\s|\w)+\W(?<Operator>[^\,]+),(\s|\w)+\W(?<Operator_ID>(\d+))"]
}
}
}
- 完整版如下:
input {
file{
path => ["/home/API/*.log"]
type => "API"
start_position => "beginning"
}
file{
path => ["/home/DAO/*.log"]
type => "DAO"
start_position => "beginning"
}
}
filter {
if [type] == "API" {
grok {
match => ["message", "^\[(?<Date>[^\]]+)\]\s(?<Level>[^\s]+)\s,\w+\W(?<Client_IP>[^\s]+)\s,\w+\W(?<Server_IP>[^\s]+)\s,(\w|\s)+\W(?<Process_ID>[^\,]+),(?<Class>[^\,]+),(?<Method>[^\,]+),(\w|\s)+\W\s(?<Interface>[^\,]+),(\w|\s)+\W\s(?<Host_IP>[^\,]+),(\w|\s)+\W(?<Execution_Time>[^m]+)\w+,\w+(\W|\s).(?<Param>[^\R]+),\w+\W(?<result>.+),(\s|Domain)+\W(?<Hall_ID>\d+),(\s|\w)+\W(?<Operator>[^\,]+),(\s|\w)+\W(?<Operator_ID>(\d+))"]
}
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
多項選擇(不同格式的日志)
有時候我們會碰上一個日誌有多種可能格式的情況。這時候要寫成單一正則就比較困難,或者全用|隔開又比較醜陋。這時候,logstash的語法提供給我們一個有趣的解決方式。(多維陣列切割方式)
下方舉兩個日誌作範例
日誌一:
[2017-05-10 19:52:42.863] ,ClientIP:123.456.789.11 ,SeverIP:124.362.251.52 ,ExampleApi ,getGame ,message:['message':'Hello World', 'data': [2, 5, 7]] , Operator:Neil
日誌二:
[2017-05-10 19:52:42.863] ,ClientIP:123.456.789.11 ,SeverIP:124.362.251.52 ,ExampleApi ,getGame ,message:['message':'Hello World', 'data': [2, 5, 7]] , Operator:Neil ,Operator_ID:3
上方的「日誌一」與「日誌二」被定義成相同的日誌,但是在撰寫日誌時可能沒注意到導致格式不相同或有遺漏,可以很明顯地看到「日誌二」多了[Operator_ID:3],但「日誌一」沒有,這時要寫成一個正規式又很困難,就會使用到Grok提供的「多維陣列切割方式」,撰寫格式如下:↓
match => [ "message", "自己撰寫的正規式", "message", "自己撰寫的正規式", "message", "自己撰寫的正規式" ]
接著就實際切割「日誌一」與「日誌二」啦,撰寫格式如下:↓
filter {
grok {
match => [
"message", "^\[(?<Date>[^\]]+)\]\s\W\w+\W(?<Client_IP>(\d+.\d+.\d+.\d+))\s,\w+\W(?<Sever_IP>(\d+.\d+.\d+.\d+))\s\W(?<Class>[^\,]+),(?<Method>[^\s]+)\s\W\w+\W(?<Message>[^\O]+)\w+\W(?<Operator>[^\s]+)",
"message", "^\[(?<Date>[^\]]+)\]\s\W\w+\W(?<Client_IP>(\d+.\d+.\d+.\d+))\s,\w+\W(?<Sever_IP>(\d+.\d+.\d+.\d+))\s\W(?<Class>[^\,]+),(?<Method>[^\s]+)\s\W\w+\W(?<Message>[^\O]+)\w+\W(?<Operator>[^\s]+)\s\W\w+\W(?<Operator_ID>.+)"
]
}
}
※※ **要特別注意正規式會按照這個定義次序依次嘗試匹配,到匹配成功為止,所以順訓很重要只要符合其中一筆正規式就不再對其他的了**
※ 補充
Lohstash.conf 修改完畢後,須重新啟動Logstash容器,因為我們在docker-compose.yml 第23行有增加一行指令是「logstash -f /usr/share/logstash/config/logstash.conf 」,這行指令會直執行「logstash.conf 」檔案,所以需重新跑過「logstash.conf 」檔案,才可進行監控+欄位切割,但是舊的已經在kibana上面的Log並不會切割,必須移除Log然後再重新匯入,才可以切割欄位。
start_position差異(若有編輯過檔案)=>若原本存在.
需先刪除「.sincedb」(參考文件)
有增加start_position | 無增加start_position |
---|---|
若監控的資料夾一開始就有檔案,會將檔案的Log匯入 | 若監控的資料夾一開始就有檔案,不會將檔案的Log匯入 |
匯入該監控資料夾內,所有檔案的全部資料(除非監控指定檔案 ) | 匯入該監控資料夾內,所有檔案的最後一筆資料(除非監控指定檔案) |
另外「input」提供了「ignore_older」插件,經測試後整理一下該功能:
ignore_older設定 | 功能說明 |
---|---|
ignore_older => 60(讀取檔案建立的時間(含編輯),若超過設定時間則不在監控) | 表示60秒後,若新增新的檔案內容,另一支沒編輯的檔案不會受影響 |
尚未確認:
官方網站有說到監控的Log,會額外寫入到Logstash 底下的 .sincedb,不知道是不是我自己會錯意,我將該檔案該檔案刪除後,Log仍存在,還需要在找時間看一下文章
Logstash filter 介紹完畢
Logstash output 介紹 : 建立不同索引
建立索引可以區分不同的Log 日誌,例如現在有「API」、「DAO」兩種Log,這樣就可以建立兩個不同的索引,存放這兩種Log,假設API Log就建立一個API的索引,DAO Log就建立一個DAO的索引,這樣當要查詢API Log就可以透過API的索引找到只有API 的Log
進入主題,建立索引時,可以透過output的方式指定索引,這時又需要透過input的tpye做分別,如下方撰寫方式:
output {
if [type] == "API" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "api-%{+YYYY-MM-dd}"
}
}
if [type] == "DAO" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "dao-%{+YYYY-MM-dd}"
}
}
}
上方的「index」可以看到除了定義「索引」名稱之外,還可以指定「索引日期」,做這樣的命名方式有個好處,假設未來想刪除2017-05-22以前的Log,可以透過索引來做刪除,在這邊Elasticsearch有提供刪除指令
- 完整版如下:
input {
file{
path => ["/home/API/Api_*.log"]
type => "API"
start_position => "beginning"
}
file{
path => ["/home/DAO/*.log"]
type => "DAO"
start_position => "beginning"
}
}
filter {
if [type] == "API" {
grok {
match =>
["message", "^\[(?<Date>[^\]]+)\]\s(?<Level>[^\s]+)\s,\w+\W(?<Client_IP>[^\s]+)\s,\w+\W(?<Server_IP>[^\s]+)\s,(\w|\s)+\W(?<Process_ID>[^\,]+),(?<Class>[^\,]+),(?<Method>[^\,]+),(\w|\s)+\W\s(?<Interface>[^\,]+),(\w|\s)+\W\s(?<Host_IP>[^\,]+),(\w|\s)+\W(?<Execution_Time>[^m]+)\w+,\w+(\W|\s).(?<Param>[^\R]+),\w+\W(?<result>.+),(\s|Domain)+\W(?<Hall_ID>\d+),(\s|\w)+\W(?<Operator>[^\,]+),(\s|\w)+\W(?<Operator_ID>(\d+))"]
}
}
}
output {
if [type] == "API" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "api-%{+YYYY-MM-dd}"
}
}
if [type] == "DAO" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "dao-%{+YYYY-MM-dd}"
}
}
}
Logstash output 完畢