(1)继承理论
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。
Logstash 是 Elastic 栈非常重要的一部分,但是它不仅仅为 Elasticsearch 所使用。它可以介绍广泛的各种数据源。Logstash 可以帮利用它自己的 Filter 帮我们对数据进行解析,丰富,转换等。
最后,它可以把自己的数据输出到各种需要的数据储存地,这其中包括 Elasticsearch。
(2)自动同步Mysql和ES
1、下载Logstash工具(版本必须一致)
logstash-8.4.0-windows-x86_64.zip
2、mysql数据库可用
3、在解压后的logstash目录下新建文件夹mysql
mysql驱动放入到上面新建的mysql目录下
4、创建脚本文件find.sql
在mysql目录下,新建文件find.sql,并写入数据备份所用到的查询sql,如:
-- student 为需要导入的表
select * from student
5、创建配置文件mysql.conf
在mysql目录下,新建文件mysql.conf,并写入如下配置:
input {
stdin {
}
jdbc {
# mysql 数据库链接,jdbc版本比较大的要加上?后面那串字符
jdbc_connection_string => "jdbc:mysql://localhost:3306/数据库名?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
# 用户名和密码
jdbc_user => "用户名"
jdbc_password => "密码"
# 驱动
jdbc_driver_library => "mysql-connector-java.jar包所在路径"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 就是上一步创建的sql文件的绝对路径+文件名字
statement_filepath => "find.sql文件所在路径"
# 也可以写一个要执行sql语句,替代statement_filepath的方式
#statement => "select * from tb_article"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "student"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称,elasticsearch叫做索引,相当于es的数据库
index => "自定义索引名"
# 自增ID id必须是待查询的数据表的序列字段
document_id => "%{表的主键字段名}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
参考配置例子如下:
input {
stdin {
}
jdbc {
# mysql 数据库链接,center为数据库名,jdbc版本比较大的要加上?后面那串字符
jdbc_connection_string => "jdbc:mysql://localhost:3306/mysql_test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "123456"
# 驱动
jdbc_driver_library => "D:logstash-8.4.0mysqlmysql-connector-java-8.0.30.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 就是上一步创建的sql文件的绝对路径+文件名字
statement_filepath => "D:logstash-8.4.0mysqlfind.sql"
# 也可以写一个要执行sql语句,替代statement_filepath的方式
#statement => "select * from tb_article"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "student"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称,elasticsearch叫做索引,相当于es的数据库
index => "mysql"
# 自增ID id必须是待查询的数据表的序列字段
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
6、启动Logstash开始mysql表数据备份
打开命令行窗口,进入到zip包解压后logstash的目录bin下,输入:
//logstash -f mysql.conf文件路径,如:
logstash -f D:logstash-8.4.0mysqlmysql.conf
GET test-student/_search
{
"query":{
"match_all": {}
}
}