Logstash
Install
Config yaml
# RPM Install
$ vim /etc/logstash/logstash.yml
# TAR Install
$ vim ${LOGSTASH_HOME}/config/logstash.yml
Plugin Config
使用 config 文件的两种方式
${LOGSTASH_HOME}/config.d
下添加*.conf
文件,系统会自动加载里面的文件${LOGSTASH_HOME}/bin/logstsh -f jdbc.conf
启动时指定
Logstash input plugin - jdbc
input {
jdbc {
type => "db.table.service"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://192.168.0.1:3306/db?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true"
# 数据库连接账号密码;
jdbc_user => "username"
jdbc_password => "password"
# MySQL依赖包路径;
jdbc_driver_library => "/usr/share/logstash/mysql/mysql-connector-java-5.1.48.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库重连尝试次数
connection_retry_attempts => "3"
# 判断数据库连接是否可用,默认false不开启
jdbc_validate_connection => "true"
# 数据库连接可用校验超时时间,默认3600s
jdbc_validation_timeout => "3600"
# 开启分页查询(默认false不开启)
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
jdbc_page_size => "500"
# statement 为查询数据 sql,如果 sql 较复杂,建议配通过 statement_filepath 配置 sql 文件的存放路径
# statement_filepath => "/usr/share/logstash/mysql/jdbc.sql"
# sql_last_value 为内置的变量,存放上次查询结果中最后一条数据 tracking_column 的值,此处即为 meta_latest_update
# 加 Limit 是为了在数据量过大时,首次同步安全过渡同步,但是要注意 Limit 的值必须大于一秒内更新的数据总量,因为 Logstash
# 记录的 timestamp 类型的 record_last_run 默认只是精确到秒级别,很坑!
statement => "SELECT * FROM `hdfs_dir_meta_test` WHERE meta_latest_update >= :sql_last_value order by meta_latest_update asc LIMIT 100000"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为 false)
lowercase_column_names => false
# Value can be any of: fatal,error,warn,info,debug,默认info
sql_log_level => warn
# 是否记录上次执行结果,true 表示会将上次执行结果的 tracking_column 字段的值保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 需要记录查询结果某字段的值时,此字段为 true,否则默认 tracking_column 为 timestamp 的值
use_column_value => true
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => "meta_latest_update"
# Value can be any of: numeric,timestamp,Default value is "numeric"
tracking_column_type => timestamp
# record_last_run 上次数据存放位置
last_run_metadata_path => "/usr/share/logstash/mysql/hdfs_meta_last_id"
# 是否清除 last_run_metadata_path 的记录,需要增量同步时此字段必须为 false
clean_run => false
# 同步频率(分 时 天 月 年),默认每分钟同步一次
schedule => "* * * * *"
}
}
filter {
# json {
# source => "message"
# remove_field => ["message"]
# }
# convert 字段类型转换,将字段TotalMoney数据类型改为float;
# mutate {
# convert => {
# "TotalMoney" => "float"
# }
# }
}
output {
elasticsearch {
# host => "192.168.1.1"
# port => "9200"
# 配置ES集群地址
hosts => ["192.168.1.1:9200"]
# 索引名字,必须小写
index => "hdfs_dir_meta"
# 数据唯一索引(建议使用数据库 Primary Key)
document_id => "%{cluster}-${absolute_path}"
}
stdout {
codec => json_lines
}
}
Logstash input plugin - kafka
从 kafka topic 中读取 events,logstash 作为消费者进行 group 的管理,并使用 kafka 默认的 offset 管理策略。
Logstash 实例默认使用一个逻辑组(group_id => "logstash"
)来订阅 Kafka topic,使用者可以运行多个线程来提高读取吞吐量。或者,直接使用相同的group_id
运行多个 Logstash 实例,以跨物理机分散负载。主题中的消息将分发到具有相同group_id
的所有 Logstash 实例。
理想情况下,线程数应该与分区数量一样多以实现完美平衡 - 线程多于分区意味着某些线程将处于空闲状态
kafka 的 metadata:
- [@metadata][kafka][topic]: Original Kafka topic from where the message was consumed.
- [@metadata][kafka][consumer_group]: Consumer group
- [@metadata][kafka][partition]: Partition info for this message.
- [@metadata][kafka][offset]: Original record offset for this message.
- [@metadata][kafka][key]: Record key, if any.
- [@metadata][kafka][timestamp]: Timestamp when this message was received by the Kafka broker.
配置 kafka input:
input {
kafka {
id => "kafka_inuput_logstash_person"
bootstrap_servers => "kafka:9999"
client_id => "test"
group_id => "test"
topics => ["logstash_person"]
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
type => "person"
}
kafka {
id => "kafka_inuput_logstash_cat"
bootstrap_servers => "kafka:9999"
client_id => "test"
group_id => "test"
topics => ["logstash_cat"]
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
type => "cat"
}
kafka {
id => "kafka_inuput_logstash_download_client"
bootstrap_servers => "10.107.1.43:9092,10.107.1.74:9092,10.107.1.108:9092"
client_id => "logstash_download_client_1"
group_id => "hdfs_client_download"
topics => ["dlink_download_task_error_log"]
auto_offset_reset => "earliest"
consumer_threads => 5
enable_auto_commit => true
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
sasl_jaas_config => 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";'
decorate_events => true
type => "log"
#codec => json
}
}
filter {
if [type] == "person" {
grok {
match => { "message" => "%{WORD:name} %{DATE:birthday} %{IP:ip}" }
remove_field => "message"
}
}
else if [type] == "cat" {
grok {
match => { "message" => "%{WORD:kind} %{WORD:master} %{NUMBER:age}" }
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
}
else if [type] == "log" {
# 将 message 转为 json 格式
json {
source => "message"
target => "message"
}
}
}
output {
elasticsearch {
hosts => ["http://es_node:9200"]
index => "logstash-%{[type]}-%{+YYYY.MM.dd}"
manage_template => false
}
}
- id: input plugin 的唯一标识符
- bootstrap_server: kafka 节点地址
- client_id: 发出请求时传递给服务器的id字符串,这样包含逻辑的应用程序可以跟踪请求的来源。
- group_id: 消费者 groupID
- topics: kafka topic,数组类型
- auto_offset_reset: 当 Kafka 中没有初始偏移量或偏移量超出范围时的策略。
earliest
: 从头开始消费;latest
: 从最新的offset开始消费;none
: 如果没有找到消费者组的先前偏移量,则向消费者抛出异常;anything else
: 直接向消费者抛出异常。 - consumer_threads: 消费者线程数
- decorate_events: 此属性会将当前 topic、offset、group、partition 等信息也带到 message 中
Logstash filter plugin - grok
grok 是一个可以将非结构化的日志解析成为结构化和可查询数据的 filter plugin。grok filter plugin reference
语法%{SYNTAX:SEMANTIC}
,其中SYNTAX
是匹配内容的格式,SEMANTIC
是唯一标识符,可以理解为key
。
例如:
# 日志文件 http.log 格式如下
55.3.244.1 GET /index.html 15824 0.043
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
# 解析后,event 将会添加一些额外的字段:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
grok 已经支持的 pattern 可以参考https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
grok 是基于正则表达式的,因此任何正则表达式在 grok 当中也是有效的。使用的正则表达式库是 Oniguruma。可以去 Oniguruma 官网查看所支持的正则表达式语法。
有时候 logstash 没有满足要求的 pattern,可以自定自己的正则表达式,然后为匹配项定义一个字段。
语法:(?<field_name> pattern)
,注意不需要再加%{}
了
例如:
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} (?<id>[0-9A-F]{10,11}): %{GREEDYDATA:syslog_message}" }
}
}
除此之外还可以定义一个 pattern 文件。
vim ./patterns/my_pattern
ID [0-9A-F]{10,11}
# 然后通过 patterns_dir 说明自定义的 pattern 文件所在文件夹路径
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{ID:id}: %{GREEDYDATA:syslog_message}" }
}
}
grok pattern 可以将 string 转换成数字类型,但是目前只支持 int 和 float,语法:%{NUMBER:num:int}
Logstash filter plugin - mutate
mutate 用于重命名,删除,替换和修改事件中的字段。mutate filter plugin reference
在 Logstash 的 config 文件中访问 metadata
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
在 Logstash 的 config 文件中访问 fields
input {
beats {
port => "5044"
}
}
output {
stdout { codec => rubydebug }
kafka {
topic_id => "%{[fields][topic_id]}"
codec => plain {
format => "%{message}"
charset => "UTF-8"
}
bootstrap_server s => "kafka:9999"
}
}
Start
# 测试配置文件
bin/logstash -f pipeline1.conf --config.test_and_exit
# 启动 logstash
# --config.reload.automatic 不需要重启就可以加载被修改的配置文件
nohup bin/logstash -f pipeline1.conf --config.reload.automatic 2>&1 &
# 查看 logstash 运行日志,日志路径和格式可以通过 config/log4j2.properties 控制
tail -100f logs/logstash-plain.log
# 注意如果想同时启动多个 logstash 实例,需要修改 config/logstash.yml 中的 path.data
Exceptions
OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid26719.hprof ...
Heap dump file created [1385285026 bytes in 5.825 secs]
Exception in thread "defaultEventExecutorGroup-4-1" java.lang.OutOfMemoryError: Java heap space
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"
Exception in thread "defaultEventExecutorGroup-4-2" java.lang.OutOfMemoryError: Java heap space
Exception in thread "LogStash::Runner" java.lang.OutOfMemoryError: Java heap space
Exception in thread "Ruby-0-Thread-1: /root/elk_zhangqiang/logstash-6.2.3-ship/lib/bootstrap/environment.rb:6" java.lang.OutOfMemoryError: Java heap space
Exception in thread "nioEventLoopGroup-3-3" java.lang.OutOfMemoryError: Java heap space
通过 JVM 参数--XX:-HeapDumpOnOutOfMemoryError
可以让 JVM 在出现内存溢出时 dump 出当前的内存转储快照,如上面的异常(第二行)所示。
通过JProfiler等工具分析内存溢出的原因
Result: start-limit
systemctl status logstash
● logstash.service - logstash
Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
Active: failed (Result: start-limit) since Thu 2020-02-27 11:12:47 CST; 4s ago
Process: 12785 ExecStart=/usr/share/logstash/bin/logstash --path.settings /etc/logstash (code=exited, status=1/FAILURE)
Main PID: 12785 (code=exited, status=1/FAILURE)
The most important log file in Linux is the /var/log/messages
file, which records a variety of events, such as the system error messages, system startups and shutdowns, change in the network configuration, etc. This is usually the first place to look at in case of problems. This file is a plain text file, so you can check it using any tool that can examine text files, such as less
. You can also use the tail
command to display the last 10 lines of this file:
# 查找 logstash 相关错误信息
$ less /var/log/messages
Feb 27 10:40:06 dlink-test su: (to root) zhangqiang on pts/1
Feb 27 10:40:16 dlink-test -bash: HISTORY: PPID=10258 PID=10259 SID=1420 UID=0 User=root systemctl start logstash
Feb 27 10:40:16 dlink-test systemd: Started logstash.
Feb 27 10:40:16 dlink-test systemd: Starting logstash...
Feb 27 10:40:16 dlink-test logstash: could not find java; set JAVA_HOME or ensure java is in PATH
Feb 27 10:40:16 dlink-test systemd: logstash.service: main process exited, code=exited, status=1/FAILURE
Feb 27 10:40:16 dlink-test systemd: Unit logstash.service entered failed state.
Feb 27 10:40:16 dlink-test systemd: logstash.service failed.
Feb 27 10:40:16 dlink-test systemd: logstash.service holdoff time over, scheduling restart.