自动同步Mysql和ES:Logstash

(1)继承理论

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。

Logstash 是 Elastic 栈非常重要的一部分,但是它不仅仅为 Elasticsearch 所使用。它可以介绍广泛的各种数据源。Logstash 可以帮利用它自己的 Filter 帮我们对数据进行解析,丰富,转换等。

最后,它可以把自己的数据输出到各种需要的数据储存地,这其中包括 Elasticsearch。

(2)自动同步Mysql和ES

1、下载Logstash工具(版本必须一致)

    logstash-8.4.0-windows-x86_64.zip

2、mysql数据库可用

3、在解压后的logstash目录下新建文件夹mysql

    mysql驱动放入到上面新建的mysql目录下

4、创建脚本文件find.sql

    在mysql目录下,新建文件find.sql,并写入数据备份所用到的查询sql,如:

-- student 为需要导入的表

select * from student

5、创建配置文件mysql.conf

        在mysql目录下,新建文件mysql.conf,并写入如下配置:

input {

    stdin {

    }

    jdbc {

      # mysql 数据库链接,jdbc版本比较大的要加上?后面那串字符

      jdbc_connection_string => "jdbc:mysql://localhost:3306/数据库名?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"

      # 用户名和密码

      jdbc_user => "用户名"

      jdbc_password => "密码"

      # 驱动

      jdbc_driver_library => "mysql-connector-java.jar包所在路径"

      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"

      jdbc_page_size => "50000"

      # 执行的sql 就是上一步创建的sql文件的绝对路径+文件名字

      statement_filepath => "find.sql文件所在路径"

  # 也可以写一个要执行sql语句,替代statement_filepath的方式

  #statement => "select * from tb_article"

      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

      schedule => "* * * * *"

      # 索引类型

      type => "student"

    }

}

filter {

    json {

        source => "message"

        remove_field => ["message"]

    }

}

output {

    elasticsearch {

        # ES的IP地址及端口

        hosts => ["localhost:9200"]

        # 索引名称,elasticsearch叫做索引,相当于es的数据库

        index => "自定义索引名"

        # 自增ID id必须是待查询的数据表的序列字段

        document_id => "%{表的主键字段名}"

    }

    stdout {

      # JSON格式输出

        codec => json_lines

    }

}

参考配置例子如下:

input {

    stdin {

    }

    jdbc {

      # mysql 数据库链接,center为数据库名,jdbc版本比较大的要加上?后面那串字符

      jdbc_connection_string => "jdbc:mysql://localhost:3306/mysql_test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"

      # 用户名和密码

      jdbc_user => "root"

      jdbc_password => "123456"

      # 驱动

      jdbc_driver_library => "D:logstash-8.4.0mysqlmysql-connector-java-8.0.30.jar"

      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"

      jdbc_page_size => "50000"

      # 执行的sql 就是上一步创建的sql文件的绝对路径+文件名字

      statement_filepath => "D:logstash-8.4.0mysqlfind.sql"

  # 也可以写一个要执行sql语句,替代statement_filepath的方式

  #statement => "select * from tb_article"

      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

      schedule => "* * * * *"

      # 索引类型

      type => "student"

    }

}

filter {

    json {

        source => "message"

        remove_field => ["message"]

    }

}

output {

    elasticsearch {

        # ES的IP地址及端口

        hosts => ["localhost:9200"]

        # 索引名称,elasticsearch叫做索引,相当于es的数据库

        index => "mysql"

        # 自增ID id必须是待查询的数据表的序列字段

        document_id => "%{id}"

    }

    stdout {

      # JSON格式输出

        codec => json_lines

    }

}

6、启动Logstash开始mysql表数据备份

        打开命令行窗口,进入到zip包解压后logstash的目录bin下,输入:

//logstash -f mysql.conf文件路径,如:

logstash -f D:logstash-8.4.0mysqlmysql.conf

GET test-student/_search

{

  "query":{

    "match_all": {}

  }

}