input { jdbc { jdbc_driver_library => "/rdbc/mysql-connector-java-8.0.19.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://mysql:3306/blog" jdbc_user => "blog" jdbc_password => "blog" jdbc_paging_enabled => true tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM articles WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" } } filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["id", "@version", "unix_ts_in_secs"] } } output { # stdout { codec => "rubydebug"} elasticsearch { hosts => ["es:9200"] index => "articles" document_id => "%{[@metadata][_id]}" } }