Logstash

Logstash

Install

Config yaml

# RPM Install
$ vim /etc/logstash/logstash.yml
# TAR Install
$ vim ${LOGSTASH_HOME}/config/logstash.yml

Plugin Config

使用 config 文件的两种方式

  • ${LOGSTASH_HOME}/config.d下添加*.conf文件,系统会自动加载里面的文件
  • ${LOGSTASH_HOME}/bin/logstsh -f jdbc.conf启动时指定

Logstash input plugin - jdbc

input {
        jdbc {
                type => "db.table.service"
                # 数据库连接地址
                jdbc_connection_string => "jdbc:mysql://192.168.0.1:3306/db?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true"
                # 数据库连接账号密码;
                jdbc_user => "username"
                jdbc_password => "password"
                # MySQL依赖包路径;
                jdbc_driver_library => "/usr/share/logstash/mysql/mysql-connector-java-5.1.48.jar"
                # the name of the driver class for mysql
                jdbc_driver_class => "com.mysql.jdbc.Driver"
                # 数据库重连尝试次数
                connection_retry_attempts => "3"
                # 判断数据库连接是否可用,默认false不开启
                jdbc_validate_connection => "true"
                # 数据库连接可用校验超时时间,默认3600s
                jdbc_validation_timeout => "3600"
                # 开启分页查询(默认false不开启)
                jdbc_paging_enabled => "true"
                # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
                jdbc_page_size => "500"
                # statement 为查询数据 sql,如果 sql 较复杂,建议配通过 statement_filepath 配置 sql 文件的存放路径
                # statement_filepath => "/usr/share/logstash/mysql/jdbc.sql"
                # sql_last_value 为内置的变量,存放上次查询结果中最后一条数据 tracking_column 的值,此处即为 meta_latest_update
                # 加 Limit 是为了在数据量过大时,首次同步安全过渡同步,但是要注意 Limit 的值必须大于一秒内更新的数据总量,因为 Logstash
                # 记录的 timestamp 类型的 record_last_run 默认只是精确到秒级别,很坑!
                statement => "SELECT * FROM `hdfs_dir_meta_test` WHERE meta_latest_update >= :sql_last_value order by meta_latest_update asc LIMIT 100000"
                # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为 false                lowercase_column_names => false
                # Value can be any of: fatal,error,warn,info,debug,默认info
                sql_log_level => warn
                # 是否记录上次执行结果,true 表示会将上次执行结果的 tracking_column 字段的值保存到 last_run_metadata_path 指定的文件中
                record_last_run => true
                # 需要记录查询结果某字段的值时,此字段为 true,否则默认 tracking_column 为 timestamp 的值
                use_column_value => true
                # 需要记录的字段,用于增量同步,需是数据库字段
                tracking_column => "meta_latest_update"
                # Value can be any of: numeric,timestamp,Default value is "numeric"
                tracking_column_type => timestamp
                # record_last_run 上次数据存放位置
                last_run_metadata_path => "/usr/share/logstash/mysql/hdfs_meta_last_id"
                # 是否清除 last_run_metadata_path 的记录,需要增量同步时此字段必须为 false
                clean_run => false
                # 同步频率(分 时 天 月 年),默认每分钟同步一次
                schedule => "* * * * *"
        }
}

filter {
        # json {
        #       source => "message"
        #       remove_field => ["message"]
        # }
        # convert 字段类型转换,将字段TotalMoney数据类型改为float;
        # mutate {
        #       convert => {
        #               "TotalMoney" => "float"
        #       }
        # }
}

output {
        elasticsearch {
                # host => "192.168.1.1"
                # port => "9200"
                # 配置ES集群地址
                hosts => ["192.168.1.1:9200"]
                # 索引名字,必须小写
                index => "hdfs_dir_meta"
                # 数据唯一索引(建议使用数据库 Primary Key)
                document_id => "%{cluster}-${absolute_path}"
        }
        stdout {
                codec => json_lines
        }
}

Logstash input plugin - kafka

从 kafka topic 中读取 events,logstash 作为消费者进行 group 的管理,并使用 kafka 默认的 offset 管理策略。

Logstash 实例默认使用一个逻辑组(group_id => "logstash")来订阅 Kafka topic,使用者可以运行多个线程来提高读取吞吐量。或者,直接使用相同的group_id运行多个 Logstash 实例,以跨物理机分散负载。主题中的消息将分发到具有相同group_id的所有 Logstash 实例。

理想情况下,线程数应该与分区数量一样多以实现完美平衡 - 线程多于分区意味着某些线程将处于空闲状态

kafka 的 metadata:

  • [@metadata][kafka][topic]: Original Kafka topic from where the message was consumed.
  • [@metadata][kafka][consumer_group]: Consumer group
  • [@metadata][kafka][partition]: Partition info for this message.
  • [@metadata][kafka][offset]: Original record offset for this message.
  • [@metadata][kafka][key]: Record key, if any.
  • [@metadata][kafka][timestamp]: Timestamp when this message was received by the Kafka broker.

配置 kafka input:

input {
  kafka {
    id => "kafka_inuput_logstash_person"
    bootstrap_servers => "kafka:9999"
    client_id => "test"
    group_id => "test"
    topics => ["logstash_person"]
    auto_offset_reset => "latest"
    consumer_threads => 5
    decorate_events => true
    type => "person"
  }
  kafka {
    id => "kafka_inuput_logstash_cat"
    bootstrap_servers => "kafka:9999"
    client_id => "test"
    group_id => "test"
    topics => ["logstash_cat"]
    auto_offset_reset => "latest"
    consumer_threads => 5
    decorate_events => true
    type => "cat"
  }
  kafka {
    id => "kafka_inuput_logstash_download_client"
    bootstrap_servers => "10.107.1.43:9092,10.107.1.74:9092,10.107.1.108:9092"
    client_id => "logstash_download_client_1"
    group_id => "hdfs_client_download"
    topics => ["dlink_download_task_error_log"]
    auto_offset_reset => "earliest"
    consumer_threads => 5
    enable_auto_commit => true
    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";'
    decorate_events => true
    type => "log"
    #codec => json
  }

}

filter {
  if [type] == "person" {
    grok {
      match => { "message" => "%{WORD:name} %{DATE:birthday} %{IP:ip}" }
      remove_field => "message"
    }
  }
  else if [type] == "cat" {
    grok {
      match => { "message" => "%{WORD:kind} %{WORD:master} %{NUMBER:age}" }
    }
    mutate {
      add_field => { "read_timestamp" => "%{@timestamp}" }
    }
  }
  else if [type] == "log" {
      # 将 message 转为 json 格式
      json {
          source => "message"
          target => "message"
      }
    }
}

output {
  elasticsearch {
    hosts => ["http://es_node:9200"]
    index => "logstash-%{[type]}-%{+YYYY.MM.dd}"
    manage_template => false
  }
} 
  • id: input plugin 的唯一标识符
  • bootstrap_server: kafka 节点地址
  • client_id: 发出请求时传递给服务器的id字符串,这样包含逻辑的应用程序可以跟踪请求的来源。
  • group_id: 消费者 groupID
  • topics: kafka topic,数组类型
  • auto_offset_reset: 当 Kafka 中没有初始偏移量或偏移量超出范围时的策略。earliest: 从头开始消费;latest: 从最新的offset开始消费;none: 如果没有找到消费者组的先前偏移量,则向消费者抛出异常;anything else: 直接向消费者抛出异常。
  • consumer_threads: 消费者线程数
  • decorate_events: 此属性会将当前 topic、offset、group、partition 等信息也带到 message 中

Logstash filter plugin - grok

grok 是一个可以将非结构化的日志解析成为结构化和可查询数据的 filter plugin。grok filter plugin reference

语法%{SYNTAX:SEMANTIC},其中SYNTAX是匹配内容的格式,SEMANTIC是唯一标识符,可以理解为key

例如:

# 日志文件 http.log 格式如下
55.3.244.1 GET /index.html 15824 0.043

filter {
  grok {
      match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}

# 解析后,event 将会添加一些额外的字段:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

grok 已经支持的 pattern 可以参考https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

grok 是基于正则表达式的,因此任何正则表达式在 grok 当中也是有效的。使用的正则表达式库是 Oniguruma。可以去 Oniguruma 官网查看所支持的正则表达式语法。

有时候 logstash 没有满足要求的 pattern,可以自定自己的正则表达式,然后为匹配项定义一个字段。

语法:(?<field_name> pattern),注意不需要再加%{}

例如:

filter {
  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{SYSLOGBASE} (?<id>[0-9A-F]{10,11}): %{GREEDYDATA:syslog_message}" }
  }
}

除此之外还可以定义一个 pattern 文件。

vim ./patterns/my_pattern
ID [0-9A-F]{10,11}

# 然后通过 patterns_dir 说明自定义的 pattern 文件所在文件夹路径
filter {
  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{SYSLOGBASE} %{ID:id}: %{GREEDYDATA:syslog_message}" }
  }
}

测试 grok 语法正确性

grok pattern 可以将 string 转换成数字类型,但是目前只支持 int 和 float,语法:%{NUMBER:num:int}

Logstash filter plugin - mutate

mutate 用于重命名,删除,替换和修改事件中的字段。mutate filter plugin reference

在 Logstash 的 config 文件中访问 metadata

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" 
  }
}

在 Logstash 的 config 文件中访问 fields

input {
    beats {
        port => "5044"
    }
}

output {
    stdout { codec => rubydebug }
    kafka {
        topic_id => "%{[fields][topic_id]}"
        codec => plain {
            format => "%{message}"
            charset => "UTF-8"
        }
        bootstrap_server    s => "kafka:9999"
    }
}

Start

# 测试配置文件
bin/logstash -f pipeline1.conf --config.test_and_exit

# 启动 logstash 
# --config.reload.automatic 不需要重启就可以加载被修改的配置文件
nohup bin/logstash -f pipeline1.conf --config.reload.automatic 2>&1 &

# 查看 logstash 运行日志,日志路径和格式可以通过 config/log4j2.properties 控制
tail -100f logs/logstash-plain.log

# 注意如果想同时启动多个 logstash 实例,需要修改 config/logstash.yml 中的 path.data

Exceptions

OutOfMemoryError

java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid26719.hprof ...
Heap dump file created [1385285026 bytes in 5.825 secs]
Exception in thread "defaultEventExecutorGroup-4-1" java.lang.OutOfMemoryError: Java heap space
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"

Exception in thread "defaultEventExecutorGroup-4-2" java.lang.OutOfMemoryError: Java heap space
Exception in thread "LogStash::Runner" java.lang.OutOfMemoryError: Java heap space
Exception in thread "Ruby-0-Thread-1: /root/elk_zhangqiang/logstash-6.2.3-ship/lib/bootstrap/environment.rb:6" java.lang.OutOfMemoryError: Java heap space
Exception in thread "nioEventLoopGroup-3-3" java.lang.OutOfMemoryError: Java heap space

通过 JVM 参数--XX:-HeapDumpOnOutOfMemoryError可以让 JVM 在出现内存溢出时 dump 出当前的内存转储快照,如上面的异常(第二行)所示。

通过JProfiler等工具分析内存溢出的原因

Result: start-limit

systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
   Active: failed (Result: start-limit) since Thu 2020-02-27 11:12:47 CST; 4s ago
  Process: 12785 ExecStart=/usr/share/logstash/bin/logstash --path.settings /etc/logstash (code=exited, status=1/FAILURE)
 Main PID: 12785 (code=exited, status=1/FAILURE)

The most important log file in Linux is the /var/log/messages file, which records a variety of events, such as the system error messages, system startups and shutdowns, change in the network configuration, etc. This is usually the first place to look at in case of problems. This file is a plain text file, so you can check it using any tool that can examine text files, such as less. You can also use the tail command to display the last 10 lines of this file:

# 查找 logstash 相关错误信息
$ less /var/log/messages
Feb 27 10:40:06 dlink-test su: (to root) zhangqiang on pts/1
Feb 27 10:40:16 dlink-test -bash: HISTORY: PPID=10258 PID=10259 SID=1420 UID=0 User=root systemctl start logstash
Feb 27 10:40:16 dlink-test systemd: Started logstash.
Feb 27 10:40:16 dlink-test systemd: Starting logstash...
Feb 27 10:40:16 dlink-test logstash: could not find java; set JAVA_HOME or ensure java is in PATH
Feb 27 10:40:16 dlink-test systemd: logstash.service: main process exited, code=exited, status=1/FAILURE
Feb 27 10:40:16 dlink-test systemd: Unit logstash.service entered failed state.
Feb 27 10:40:16 dlink-test systemd: logstash.service failed.
Feb 27 10:40:16 dlink-test systemd: logstash.service holdoff time over, scheduling restart.

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦