栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

ElasticSearch学习使用(含ELK)

Java 更新时间:发布时间: 百科书网 趣学号

ElasticSearch学习使用
  • 一、基础概念
  • 二、安装使用
  • 三、其他概念
  • 四、 分词(ik分词器)的使用
  • 五、Springboot项目使用
  • 六、
  • 十三、ES常用查询语句总结
  • 十四、ELK环境搭建
    • ①、logstash采集tomcat或springboot工程log
    • ②、metricbeat监控服务器,采集服务器数据
    • ③启动elasticsearch-head,访问http://localhost:8080/elasticsearch-head/
    • ④启动elasticsearch-curator,管理es索引,清理es过期数据
  • 十五、logstash采集数据延迟八小时

一、基础概念

ES的每个版本差别很大,在ES7之前,使用的是下面的架构:
跟mysql相比,索引=库名、类型=表名、文档=数据

ES7版本之后

二、安装使用
  1. 安装ElasticSearch和可视化界面kibana。
    Windows开箱即用,增删改查皆为restful接口

    如果需要用真实IP而不仅仅是localhost连接ES,则需要修改一下config下的elasticsearch.yml内容

  2. 安装成功后直接启动,访问http://localhost:9200和http://localhost:5601,分别出现以下界面表示成功

  3. 利用postman测试添加数据

  • 添加使用post和put(一定要带ID)请求都可以,es6版本格式为http://localhost:9200/索引/类型/(可选参数文档ID),参数为文档(跟mysql相比索引=库名、类型=表名、文档=数据)


返回结果

  • es7之后不用指定类型了http://localhost:9200/索引/(可选参数文档ID),可以使用映射指定存储的数据类型
  1. 测试查询
  • 根据ID查询某一条:格式为http://localhost:9200/索引/类型/ID
  • 根据条件检索 customer/_search?q=*&sort=age:asc,其他API参照https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started.html#qs-search-data
  1. 测试更新
  2. 删除(不可删除类型,就像MySQL只能删除库和数据,不能删除表一样)
  3. 批量插入,bulk批量API
    格式为http://localhost:9200/索引/类型/_bulk

    用kibanad的dev tools测试批量插入
三、其他概念
  1. 聚合(avg)映射(mapping)等
    参考官方文档:elastic search官方文档

    简单说明:(映射)mapping:我理解的就相当于mysql的表结构
  • 创建映射
  • 更新映射。要求更新映射后数据不变,比如修改某个映射从integer改成text
    (1)先新建新的索引

    (2)再查看原映射关系,Ctrl+CV粘贴映射关系,没有直接更新映射还保持数据的方法

    (3)迁移数据命令
四、 分词(ik分词器)的使用

(1)下载跟据es版本下载对应ik并解压到如下目录下

(2)下载并启动nginx,在nginx下建自定义分词文件,目录如下

访问地址http://localhost/ElasticSearch/fenci.txt能看到数据
(3)在分词器插件里修改配置


(4)重启es,再次请求

POST _analyze
{
  "analyzer": "ik_max_word",
  "text": "艾瑞克电商项目"
}

可以看到自定义分词已经成功

五、Springboot项目使用
  1. 引入依赖

    org.elasticsearch.client
    elasticsearch-rest-high-level-client
    7.13.4


  1. 新建config包下的ElasticSearchconfig,编写配置,给spring容器中注入一个RestHighLevelClient,具体代码内容如下
    更多解释参考官方文档
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticSearchConfig {

    @Bean
    RestHighLevelClient esRestClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
        return client;
    }
}

如果需要安全验证,在每次请求时都带上请求头,就在上面的配置类里加上
,更多内容参考官方文档

import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class GulimallElasticSearchConfig {

    @Bean
    RestHighLevelClient esRestClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
        return client;
    }

    public static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
//        builder.addHeader("Authorization", "Bearer " + TOKEN);
//        builder.setHttpAsyncResponseConsumerFactory(
//                new HttpAsyncResponseConsumerFactory
//                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }
}
  1. 测试使用
    (1)测试添加。更多内容参考官方文档
@Test
    void test() throws IOException {

        IndexRequest indexRequest=new IndexRequest("user");
        indexRequest.id("1");
        String jsonStr="{"name":"Eric FRQ","age":23}";
        indexRequest.source(jsonStr, XContentType.JSON);
        IndexResponse index = client.index(indexRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
        System.out.println(index);
    }

(2)测试批量添加。更多内容参考官方文档

    @Test
    void testBulkAdd() throws IOException {
        BulkRequest request = new BulkRequest();
        request.add(new IndexRequest("user").id("2")
                .source("{"name":"Stefan Zhou","age":53}",XContentType.JSON));
        request.add(new IndexRequest("user").id("3")
                .source("{"name":"Bulus Li","age":63}",XContentType.JSON));
        request.add(new IndexRequest("user").id("4")
                .source("{"name":"Jack Chen","age":55}",XContentType.JSON));
        client.bulk(request, GulimallElasticSearchConfig.COMMON_OPTIONS);
    }

(3)测试检索。更多内容参考官方文档

 @Test
    void testSearch() throws IOException {
        SearchRequest searchRequest = new SearchRequest();
        //索引
        searchRequest.indices("user");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //searchSourceBuilder.query(QueryBuilders.matchQuery("name","Eric"));
        //按照年龄聚合
        TermsAggregationBuilder size = AggregationBuilders.terms("aggAgg").field("age").size(3);
        searchSourceBuilder.aggregation(size);
        //按照年龄平均值聚合
        AvgAggregationBuilder field = AggregationBuilders.avg("balanceAvg").field("age");
        searchSourceBuilder.aggregation(field);
        
        searchRequest.source(searchSourceBuilder);
        SearchResponse search = client.search(searchRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
        System.out.println(search.toString());
    }
六、 十三、ES常用查询语句总结
  1. 查看所有索引:GET /_cat/indices
  2. 创建索引tomcat-logstash:put tomcat-logstash
  3. 查询某索引元数据、数据类型:get tomcat-logstash
  4. 删除某索引:DELETe springboot-logstash-2021.09.22
  5. 查询某索引存入的数据:get /tomcat-logstash/_search
  6. es开启可动态创建索引
PUT /_cluster/settings
{
    "persistent" : {
        "action": {
          "auto_create_index": "true"
        }
    }
}
  1. 复杂查询案例:
# 按时间范围查询
post tomcat-logstash/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "@timestamp": {
              "gte": "2021-02-01 00:40:39",
              "lte": "2021-12-21 23:42:59",
              "format": "yyyy-MM-dd HH:mm:ss",
              "time_zone": "+08:00"
            }
          }
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "groups": {
      "terms": {
        "field": "@timestamp",
        "size":3,
        "order" : {  "_count" : "desc" }
      }
    }
  }
}
# 按时间范围查询并将时间数据格式化yyyy-MM-dd HH:mm:ss
post /tomcat-logstash/_search
{
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "accessTime": {
                            "format": "yyyy-MM-dd HH:mm:ss"
                        }
                    }
                }
            ]
        }
    },
    "script_fields": {
        "@timestamp": {
            "script": {
                "lang": "painless",
                "inline": "doc['@timestamp'].value.toString('yyyy-MM-dd HH:mm:ss')"
            }
        }
    }
}
# 将存在metricset.name.actual.free的数据按时间倒序查询出来
post /metricbeat-*/_search
{
    "query": {
        "exists": {
            "field": "metricset.name.actual.free"
        }
    },
    "from": 1,
    "size": 2,
    "sort": [
        {
            "timestamp": {
                "order": "desc"
            }
        }
    ]
}
# 将"metricset.name"等于 "memory"的数据按时间倒序查出来
post /metricbeat-*/_search
{
    "query": {
        "match_phrase": {
            "metricset.name": "memory"
        }
    },
    "from": 1,
    "size": 1,
    "sort": [
        {
            "timestamp": {
                "order": "desc"
            }
        }
    ]
}
# 将"system.filesystem.mount_point"等于"C"
# 并且"system.filesystem.mount_point"等于"D"
# 并且"metricset.name"等于"filesystem"的数据查出五条来
post /metricbeat-*/_search 
{
    "query": {
        "bool": {
            "should": [
                {
                    "match_phrase": {
                        "system.filesystem.mount_point": "C"
                    }
                },
                {
                    "match_phrase": {
                        "metricset.name": "filesystem"
                    }
                },
                {
                    "match_phrase": {
                        "system.filesystem.mount_point": "D"
                    }
                }
            ]
        }
    },
    "from": 1,
    "size": 5
}
# 在10条数据内,以system.filesystem.device_name.keyword分组
# 查询system.filesystem.total的数据
post /metricbeat-*/_search     
{
    "size": 0,
    "query": {
        "match_phrase": {
            "metricset.name": "filesystem"
        }
    },
    "aggs": {
        "system.filesystem.device_name.keyword": {
            "terms": {
                "field": "system.filesystem.total",
                "size": 10,
                "order": {
                    "_count": "asc"
                }
            }
        }
    }
}
十四、ELK环境搭建 ①、logstash采集tomcat或springboot工程log

1、官网下载、解压、使用。我这里使用的都是7.13.0版本。

1、elasticsearch(存放数据)、
2、metricbeat(监控服务器cpu、内存等)、
3、kibana(界面化工具,对es的操作等)、
4、Logstash(日志采集)、
5、elasticsearch-head(数据展示,比kibana更直观)
6、elasticsearch-curator(es数据索引管理工具,用于定期清理es索引数据等)

2、按序启动

1、es,在ELKelasticsearch-7.13.0bin下双击elasticsearch.bat

2、kibana,在ELKkibana-7.13.0-windows-x86_64bin下双击kibana.bat

3、logstash,在ELKLogstashbin下新建logstash.conf,粘贴如下内容。
在Logstashbin下cmd输入命令logstash -f logstash.conf
说明:input.file.path=项目的日志文件路径,比如tomcat日志或者springboot输出到文件的日志

logstash.conf内容如下
如果生成固定es的索引,参考下面配置

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
    file{
        path =>"E:/Work/2021/target/logs/access_log.*.log"
        type => "tomcat_access_log"
        start_position=>"beginning"
    }
}

input {
 beats {
        port => "5044"
        type => "metricbeat"
    }
}

filter{
    grok{
        match=>{
            "message"=>"%{data:ip} - - [%{HTTPDATE:accessTime}] "%{data:method} %{data:access} %{data:httpversion}" %{data:retcode} %{data:flow} %{data:retTime} "%{data:fromHtml}" "%{data:useragent}""
        }

        remove_field=>"message"
        remove_field=> "path"
    }
    date{
        match=>["accessTime","yyyy-MM-dd-HH:mm:ss"]
    }

 ruby { 
    code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
  }
  mutate {
    remove_field => ["@timestamp","ecs"]
  }
}

output {
if "tomcat_access_log" in [type] {
 elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "tomcat-logstash"
  }
}

if "metricbeat" in [type] {
 elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "metricbeat-logstash"
  }
}
 
}

如果要生成es带时间戳的索引,参考下面配置:(如果要索引后面加时间戳,一定要有@timestamp字段,所以注释掉了删除此字段的配置)

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
    file{
        path =>"E:/Work/2021/target/logs/access_log.*.log"
        type => "tomcat_access_log"
        start_position=>"beginning"
    }
}

input {
 beats {
        port => "5044"
        type => "metricbeat"
    }
}

filter{
    grok{
        match=>{
            "message"=>"%{data:ip} - - [%{HTTPDATE:accessTime}] "%{data:method} %{data:access} %{data:httpversion}" %{data:retcode} %{data:flow} %{data:retTime} "%{data:fromHtml}" "%{data:useragent}""
        }

        remove_field=>"message"
        remove_field=> "path"
    }
    date{
        match=>["accessTime","yyyy-MM-dd-HH:mm:ss"]
    }

 ruby { 
    code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
  }
  #mutate {
    #remove_field => ["@timestamp","ecs"]
  #}
}

output {
if "tomcat_access_log" in [type] {
 elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "tomcat-logstash-%{+YYYY.MM.dd}"
  }
}

if "metricbeat" in [type] {
 elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "metricbeat-logstash-%{+YYYY.MM.dd}"
  }
}
 
}

②、metricbeat监控服务器,采集服务器数据

1、metricbeat,在上面三个都启动好了之后,编辑metricbeat.yml,搜索关键字Output,关闭直接输出到es的配置,如下图

2、搜索关键字Logstash Output,打开输出到logstash,使用端口5044,和上面的logstash.conf5044对应上,如下图

3、在ELKmetricbeat-7.13.0-windows-x86_64下cmd输入 metricbeat.exe -e

③启动elasticsearch-head,访问http://localhost:8080/elasticsearch-head/

1、将elasticsearch-head丢到tomcat的webapp下,直接启动tomcat即可
2、开启es动态创建索引,这样logstash就能跟据配置动态创建索引了

PUT /_cluster/settings
{
    "persistent" : {
        "action": {
          "auto_create_index": "true"
        }
    }
}

访问路径后页面如下:

④启动elasticsearch-curator,管理es索引,清理es过期数据

1、下载elasticsearch-curator
官网地址:https://packages.elastic.co/curator/5/windows/elasticsearch-curator-5.8.4-amd64.msi

2、安装。安装目录自选(说明:此安装其实就相当于一个解压过程,安装后的文件夹随意拷贝到其他服务器就能直接用),安装成功后是个文件夹elasticsearch-curator,然后手动创建下面两个文件config.yml、action.yml

3、配置
官网config,yml配置如下,无特殊需求,直接粘贴即可使用

client:
  hosts:
    - 127.0.0.1
  port: 9200
  url_prefix:
  use_ssl: False
  certificate:
  client_cert:
  client_key:
  ssl_no_validate: False
  http_auth:
  timeout: 30
  master_only: False

logging:
  loglevel: INFO
  logfile:
  logformat: default
  blacklist: ['elasticsearch', 'urllib3']

配置action.yml,内容如下; 其中:
actions下的1、2代表多个事件动作,delete_indices代表删除索引事件,其他事件比如关闭索引、合并索引等事件参考官网 ==>action配置说明
description:此事件的描述
options:continue_if_exception遇到异常是否继续
filters:配置删除什么样子的索引,- filtertype:pattern的索引名称模型,kind: prefix索引的前缀,
value: tomcat-logstash-前缀值是什么, - filtertype: age过期时间设置,
timestring: '%Y.%m.%d'索引前缀后面的日期格式,unit: days过期时间的单位,unit_count: 1过期时间的值

actions:
  1:
    action: delete_indices
    description: >-
      Close indices older than 1days (based on index name), forlogstash-
      prefixed indices.
    options:
      continue_if_exception: False
      ignore_empty_list: True
      disable_action: False
    filters:
    - filtertype: pattern
      kind: prefix
      value: metricbeat-logstash-
    - filtertype: age
      source: name
      direction: older
      timestring: '%Y.%m.%d'
      unit: days
      unit_count: 1
  2:
    action: delete_indices
    description: >-
      Close indices older than 7days (based on index name), forlogstash-
      prefixed indices.
    options:
      continue_if_exception: False
      ignore_empty_list: True
      disable_action: False
    filters:
    - filtertype: pattern
      kind: prefix
      value: tomcat-logstash-
    - filtertype: age
      source: name
      direction: older
      timestring: '%Y.%m.%d'
      unit: days
      unit_count: 7

4、启动命令:curator --config config.yml action.yml
运行结果:两个事件动作都完成!

5、结果验证:可以看到设置删除时间为删除一天前的,我的索引metricbeat-logstash-2021.09.27已经被删除了
设置为七天前的,tomcat-logstash-2021.09.27、tomcat-logstash-2021.09.28两个都还在

十五、logstash采集数据延迟八小时

场景:
通过metricbeat收集服务器系统日志,metricbeat中的日志发送到kafka中
Logstash中的时间为格林威治时间,因此通过logstash采集到的数据和我们的时间会有8小时的时差
如果在后续代码中处理很有可能会处理遗漏掉,造成数据的时间错误。
版本
logstash 7.6.0
解决方案如下

input {
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 4560
    codec => json_lines
  }
}
filter {
        ruby {
                code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*3600)"
        }
        ruby {
                code => "event.set('@timestamp',event.get('timestamp'))"
        }
        mutate {
                remove_field => ["timestamp"]
        }
}
output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "springboot-logstash-%{+YYYY.MM.dd}"
  }
}

logstash生成文件名中的日期是从@timestamp字段的值中获取,通过设置filter将timestamp中的时间转换成系统时间,问题解决

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/275004.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号