程式的道路上.....我跌跌撞撞的過程

文章為我平時發生錯誤的解決方法

因為擔心下次碰到時會忘記

所以就將他筆記下來了

如果有大大發現我的敘述錯誤

或者有哪些更有效率的方法

也請大大們不吝嗇的提供指教

謝謝~


說明

該篇文章介紹在Docker快速安裝Logstash,Elasticsearch,Kibana,並將三個容器串上關聯,最終目的是監控指定的資料夾的Log,然後讓Log在頁面上呈現.


安裝環境

Red Hat 7.3安裝Docker教學

  • 作業系統:Red Hat Enterprise Linux Server release 7.3
    (基本上實體平台不影響任何問題,只要能在實體平台上安裝好docker,並確保能使用就好)
  • Docker版本:17.04.0-ce
  • Docker Compose版本:1.10.0

安裝套件


開始安裝E.L.K

 

echo 262144 > /proc/sys/vm/max_map_count 
  • 增加記憶體大小 (至少要大於262144,否則建立好的elasticsearch容器會無法啟動)
git clone https://github.com/deviantony/docker-elk.git 
  • Permissive mode 設定 (需再clone的路徑下執行這一行指令)
chcon -R system_u:object_r:admin_home_t:s0 docker-elk/
  • 進入專案
cd docker-elk 
  • 建立E.L.K容器

docker-compose會讀取docker-compose.yml檔案,所以才需要先進入docker-elk專案底下

docker-compose up -d 

  • 接著檢查 elasticsearch service 是否啟動,於瀏覽器輸入↓
http://127.0.0.1:9200 


  • 接著檢查 Kibana service 是否啟動,於瀏覽器輸入↓(可能需要等個1-2分鐘才能看見該頁面)
http://127.0.0.1:5601 

以上完成E.L.K建置


E.L.K 介紹

E.L.K其實是由三個工具所組成,分別是Elasticsearch,Logstash,Kibana,就自己理解的架構圖如下方的圖表,將「各台機器」或者「機器群組」底下的Log,透過Logstash蒐集起來,然後存放置Elasticsearch底下(可以將Elasticsearch想像成資料庫就比較好理解),最後當使用者需要查看Log日誌時,在透過Kibana將Log日誌呈現在網頁上提供瀏覽.



  • 在開始學習怎麼操作Logstash之前,需要先編輯一下docker-composer.yml的內容
vi docker-elk/docker-compose.yml 
  • docker-compose.yml 新內容 :

 

version: '2' 

services: 

    elasticsearch: 
        image: elasticsearch:latest 
        container_name: docker_elasticsearch 
        restart: always 
        ports: 
            - "9200:9200" - "9300:9300" 
        environment: 
            ES_JAVA_OPTS: "-Xmx256m -Xms256m" 
            xpack.security.enabled: "false" 
            xpack.monitoring.enabled: "false" 
            xpack.graph.enabled: "false" 
            xpack.watcher.enabled: "false" 

    logstash: 
        image: logstash:latest 
        container_name: docker_logstash 
        command: logstash -f /usr/share/logstash/config/logstash.conf 
        restart: always 
        volumes: 
            - /home/newcasino/docker-elk/logstash/config:/usr/share/logstash/config 
            - ./logstash/pipeline:/usr/share/logstash/pipeline 
            - /opt/splunk/ELK/API:/home/API 
            - /opt/splunk/ELK/DAO:/home/DAO 
        ports: 
            - "5000:5000" 
        environment: 
            LS_JAVA_OPTS: "-Xmx256m -Xms256m" 
        depends_on: 
            - elasticsearch 

    kibana: 
        image: kibana:latest 
        container_name: docker_kibana 
        restart: always 
        volumes: 
            - ./kibana/config/:/usr/share/kibana/config 
        ports: 
            - "5601:5601" 
        depends_on: 
            - elasticsearch 

請仔細看docker-compose.yml的內容,第36,38,39行的部份,這邊我分別掛載了「/config」,「/API」,「/DAO」這三個資料夾,掛載「/config」的資料夾是因為要執行該資料夾底下的「logstash.conf」檔案,透過這隻檔案可以去蒐集Log,並且切割Log欄位還有指定特定索引。


Key名稱 說明 舉例值
container_name 自訂Container名稱 dev_phpfpm, dev_nginx, dev_mysql
restart 是否自動重新啟動 always
image 指定安裝的容器
(若容器的印象檔不存在,會從Docker Hub自動取得)
nginx:1.9.6, myphp:5.6-fpm
build 讀取Dockerfile檔案中指定的印象檔 FROM Ubuntu:16.04
(以Ubuntu:16.04版本為基底安裝)
ports 使用的連接port (可多個) - “80:80” (本機port:容器port)
links 連接其他容器 (可多個) - dev_mysql:db 
(本機容器名稱:在容器中的名稱)
volumes 於容器中掛載本機檔案或資料夾 (可多個) /tmp/laravel_project:/var/project 
(本機檔案:容器中的檔案)
working_dir 工作目錄 (預設執行指令的目錄) /var/project
depends_on 指定先開啟的容器順序+連接其他容器 (可多個) - elasticsearch
command 容器建立好後,會執行的指令 service apache start

初學Logstash,可以先搞懂這三個配置就可以了,分別是「input」,「filter」,「output」

名稱 功能 範例
input 輸入資料 將資料放入到Logstash
filter 負責處理資料 欄位切割…
output 輸出資料 輸出到Elasticsearch

  1. 建立logstash.conf,並設置input & output的方式
vi docker-elk/logstash/config/logstash.conf 
  1. logstash.conf內容 :
input { 
    file{ 
        #放置Log的絕對路徑(監控的目錄) 
        path => ["/home/API/*.log"] 

        #Log型態(可自行定義,稍後會使用到) 
        type => "API" 

        #從檔案的第一筆開始讀取(說明該檔案的Log須從頭開始讀) 
        start_position => "beginning" 
    }
}

output { 
    #輸出至Elasticsearch的 9200 Port 
    elasticsearch { 
        hosts => "elasticsearch:9200" 
    }
}

  1. 重新建立新的容器(因為有更改過docker-compose.yml 內容 ,所以需重新建立新的容器)
docker-compose up -d 

以上三個步驟就完成Logstash對「/home/API/ 」這個路徑底下的所有檔案的log進行監控,將要監控的Log放置該資料夾底下(API)後,建Kibana建立好預設的索引後,就可以看到Log囉~



Logstash資料夾監控完成!!


Log filter介紹 :欄位切割(基本版)


若只考量讓Log顯在在Kibana的UI頁面上,其實只需要在logstash.conf輸入如下方的文件
「path=> ["/home/API/*.log"]」是指要監控「/home/API/ 」該路徑底下任何為.log格式的檔案


input { 
    file{ 
        path => ["/home/API/*.log"] 
        type => "API" 
        start_position => "beginning" 
    }
}

output { 
    elasticsearch { 
        hosts => "elasticsearch:9200" 
    }
}

但是我想會想要使用E.L.K做日誌監控,應該不會只想將Log放置到Kibana的UI頁面上,就這樣結束對吧,所以現在要開始介紹透過「filter」做欄位切割。


Filter在Logstash是不可少的重要配置之一,除了提供過濾資料的功能,他還支援了幾個插件進行複雜的邏輯處理,甚至可以無中生有的添加新的logstash 事件到後續的流程中,準備開始透過Filter底下的gork插件切割欄位,另外可以在grok 裡定義好正则表達式,在稍后可以引用它。


如下方的切割方式「?<request_time>」,將欄位命名為「request_time」

\s+(?<request_time>\d+(?:\.\d+)?)\s+ 

下方舉個例子,假設有一筆Log紀錄如下:

[2017-05-10 19:52:42.863] ,ClientIP:123.456.789.11 ,SeverIP:124.362.251.52 ,ExampleApi ,getGame ,message:['message':'Hello World', 'data': [2, 5, 7]] , Operator:Neil 

filter的寫法可以如下

filter { 
    grok { 
        match => ["message", "^\[(?<Date>[^\]]+)\]\s\W\w+\W(?<Client_IP>(\d+.\d+.\d+.\d+))\s,\w+\W(?<Sever_IP>(\d+.\d+.\d+.\d+))\s\W(?<Class>[^\,]+),(?<Method>[^\s]+)\s\W\w+\W(?<Message>[^\O]+)\w+\W(?<Operator>[^\s]+)"] 
    }
}

Log欄位切割(進階版)

如果需要監控超過兩個資料夾的Log,且兩種Log的切割欄位的正則式都不相同,那這個時候就適合下面的方式,舉例同時監控「/home/API/ 」+「/home/DAO/ 」這兩個資料夾


這個時候在「input」使用到的「type」就派上用場了,請看如下方的撰寫方式 :

​​​​​​input { 
    file{ 
        path => ["/home/API/*.log"] 
        type => "API" 
        start_position => "beginning" 
    }
    
    file{ 
        path => ["/home/DAO/*.log"] 
        type => "DAO" 
        start_position => "beginning" 
    }
}

output { 
    elasticsearch { 
        hosts => "elasticsearch:9200" 
    }
}

上方的撰寫方式,有兩個「file」分別對home底下的「API」與「DAO」的資料夾進行監控,但是到目前為止都還沒使用到的「type」這是後就派上用場了,在這邊先指定type分別為「API」,「DAO」,接著在「filter」將會呼叫這兩個type,因為我們即將切割兩種不同撰寫方式的正則式


  • 「filter」撰寫方式如下
filter { 
    if [type] == "API" { 
        grok { 
            match => ["message", "^\[(?<Date>[^\]]+)\]\s(?<Level>[^\s]+)\s,\w+\W(?<Client_IP>[^\s]+)\s,\w+\W(?<Server_IP>[^\s]+)\s,(\w|\s)+\W(?<Process_ID>[^\,]+),(?<Class>[^\,]+),(?<Method>[^\,]+),(\w|\s)+\W\s(?<Interface>[^\,]+),(\w|\s)+\W\s(?<Host_IP>[^\,]+),(\w|\s)+\W(?<Execution_Time>[^m]+)\w+,\w+(\W|\s).(?<Param>[^\R]+),\w+\W(?<result>.+),(\s|Domain)+\W(?<Hall_ID>\d+),(\s|\w)+\W(?<Operator>[^\,]+),(\s|\w)+\W(?<Operator_ID>(\d+))"] 
        }
    }
}

  • 完整版如下:
input { 
    file{ 
        path => ["/home/API/*.log"] 
        type => "API" 
        start_position => "beginning" 
    }

    file{ 
        path => ["/home/DAO/*.log"] 
        type => "DAO" 
        start_position => "beginning" 
    }
}

filter { 
    if [type] == "API" { 
        grok { 
            match => ["message", "^\[(?<Date>[^\]]+)\]\s(?<Level>[^\s]+)\s,\w+\W(?<Client_IP>[^\s]+)\s,\w+\W(?<Server_IP>[^\s]+)\s,(\w|\s)+\W(?<Process_ID>[^\,]+),(?<Class>[^\,]+),(?<Method>[^\,]+),(\w|\s)+\W\s(?<Interface>[^\,]+),(\w|\s)+\W\s(?<Host_IP>[^\,]+),(\w|\s)+\W(?<Execution_Time>[^m]+)\w+,\w+(\W|\s).(?<Param>[^\R]+),\w+\W(?<result>.+),(\s|Domain)+\W(?<Hall_ID>\d+),(\s|\w)+\W(?<Operator>[^\,]+),(\s|\w)+\W(?<Operator_ID>(\d+))"] 
        }
    }
}

output { 
    elasticsearch { 
        hosts => "elasticsearch:9200" 
    }
}

 

多項選擇(不同格式的日志)

有時候我們會碰上一個日誌有多種可能格式的情況。這時候要寫成單一正則就比較困難,或者全用|隔開又比較醜陋。這時候,logstash的語法提供給我們一個有趣的解決方式。(多維陣列切割方式)


下方舉兩個日誌作範例

日誌一:

[2017-05-10 19:52:42.863] ,ClientIP:123.456.789.11 ,SeverIP:124.362.251.52 ,ExampleApi ,getGame ,message:['message':'Hello World', 'data': [2, 5, 7]] , Operator:Neil 

日誌二:

[2017-05-10 19:52:42.863] ,ClientIP:123.456.789.11 ,SeverIP:124.362.251.52 ,ExampleApi ,getGame ,message:['message':'Hello World', 'data': [2, 5, 7]] , Operator:Neil ,Operator_ID:3 

上方的「日誌一」與「日誌二」被定義成相同的日誌,但是在撰寫日誌時可能沒注意到導致格式不相同或有遺漏,可以很明顯地看到「日誌二」多了[Operator_ID:3],但「日誌一」沒有,這時要寫成一個正規式又很困難,就會使用到Grok提供的「多維陣列切割方式」,撰寫格式如下:↓

match => [ "message", "自己撰寫的正規式", "message", "自己撰寫的正規式", "message", "自己撰寫的正規式" ] 

接著就實際切割「日誌一」與「日誌二」啦,撰寫格式如下:↓

filter { 
    grok { 
        match => [ 
            "message", "^\[(?<Date>[^\]]+)\]\s\W\w+\W(?<Client_IP>(\d+.\d+.\d+.\d+))\s,\w+\W(?<Sever_IP>(\d+.\d+.\d+.\d+))\s\W(?<Class>[^\,]+),(?<Method>[^\s]+)\s\W\w+\W(?<Message>[^\O]+)\w+\W(?<Operator>[^\s]+)", 
            "message", "^\[(?<Date>[^\]]+)\]\s\W\w+\W(?<Client_IP>(\d+.\d+.\d+.\d+))\s,\w+\W(?<Sever_IP>(\d+.\d+.\d+.\d+))\s\W(?<Class>[^\,]+),(?<Method>[^\s]+)\s\W\w+\W(?<Message>[^\O]+)\w+\W(?<Operator>[^\s]+)\s\W\w+\W(?<Operator_ID>.+)" 
        ] 
    } 
} 

 

※※ **要特別注意正規式會按照這個定義次序依次嘗試匹配,到匹配成功為止,所以順訓很重要只要符合其中一筆正規式就不再對其他的了**


※ 補充

Lohstash.conf 修改完畢後,須重新啟動Logstash容器,因為我們在docker-compose.yml 第23行有增加一行指令是「logstash -f /usr/share/logstash/config/logstash.conf 」,這行指令會直執行「logstash.conf 」檔案,所以需重新跑過「logstash.conf 」檔案,才可進行監控+欄位切割,但是舊的已經在kibana上面的Log並不會切割,必須移除Log然後再重新匯入,才可以切割欄位。


start_position差異(若有編輯過檔案)=>若原本存在.
需先刪除「.sincedb」(參考文件)

有增加start_position 無增加start_position
若監控的資料夾一開始就有檔案,會將檔案的Log匯入 若監控的資料夾一開始就有檔案,不會將檔案的Log匯入
匯入該監控資料夾內,所有檔案的全部資料(除非監控指定檔案 ) 匯入該監控資料夾內,所有檔案的最後一筆資料(除非監控指定檔案)

另外「input」提供了「ignore_older」插件,經測試後整理一下該功能:

ignore_older設定 功能說明
ignore_older => 60(讀取檔案建立的時間(含編輯),若超過設定時間則不在監控) 表示60秒後,若新增新的檔案內容,另一支沒編輯的檔案不會受影響

尚未確認:
官方網站有說到監控的Log,會額外寫入到Logstash 底下的 .sincedb,不知道是不是我自己會錯意,我將該檔案該檔案刪除後,Log仍存在,還需要在找時間看一下文章


Logstash filter 介紹完畢


Logstash output 介紹 : 建立不同索引


建立索引可以區分不同的Log 日誌,例如現在有「API」、「DAO」兩種Log,這樣就可以建立兩個不同的索引,存放這兩種Log,假設API Log就建立一個API的索引,DAO Log就建立一個DAO的索引,這樣當要查詢API Log就可以透過API的索引找到只有API 的Log


進入主題,建立索引時,可以透過output的方式指定索引,這時又需要透過input的tpye做分別,如下方撰寫方式:


output { 
    if [type] == "API" { 
        elasticsearch { 
            hosts => "elasticsearch:9200" 
            index => "api-%{+YYYY-MM-dd}" 
        }
    }
    if [type] == "DAO" { 
        elasticsearch { 
            hosts => "elasticsearch:9200" 
            index => "dao-%{+YYYY-MM-dd}" 
        }
    }
}

上方的「index」可以看到除了定義「索引」名稱之外,還可以指定「索引日期」,做這樣的命名方式有個好處,假設未來想刪除2017-05-22以前的Log,可以透過索引來做刪除,在這邊Elasticsearch有提供刪除指令


  • 完整版如下:
input { 
    file{ 
        path => ["/home/API/Api_*.log"] 
        type => "API" 
        start_position => "beginning" 
    }
    file{ 
        path => ["/home/DAO/*.log"] 
        type => "DAO" 
        start_position => "beginning" 
    }
}

filter { 
    if [type] == "API" { 
        grok { 
            match =>  ["message", "^\[(?<Date>[^\]]+)\]\s(?<Level>[^\s]+)\s,\w+\W(?<Client_IP>[^\s]+)\s,\w+\W(?<Server_IP>[^\s]+)\s,(\w|\s)+\W(?<Process_ID>[^\,]+),(?<Class>[^\,]+),(?<Method>[^\,]+),(\w|\s)+\W\s(?<Interface>[^\,]+),(\w|\s)+\W\s(?<Host_IP>[^\,]+),(\w|\s)+\W(?<Execution_Time>[^m]+)\w+,\w+(\W|\s).(?<Param>[^\R]+),\w+\W(?<result>.+),(\s|Domain)+\W(?<Hall_ID>\d+),(\s|\w)+\W(?<Operator>[^\,]+),(\s|\w)+\W(?<Operator_ID>(\d+))"]  
        }
    }
}

output { 
    if [type] == "API" { 
        elasticsearch { 
            hosts => "elasticsearch:9200" 
            index => "api-%{+YYYY-MM-dd}" 
        }
    }
 
    if [type] == "DAO" { 
        elasticsearch { 
            hosts => "elasticsearch:9200" 
            index => "dao-%{+YYYY-MM-dd}" 
        }
    }
}

Logstash output 完畢

 

arrow
arrow

    Neil 發表在 痞客邦 留言(0) 人氣()