
ES的每个版本差别很大,在ES7之前,使用的是下面的架构:
跟mysql相比,索引=库名、类型=表名、文档=数据
ES7版本之后
安装ElasticSearch和可视化界面kibana。
Windows开箱即用,增删改查皆为restful接口
如果需要用真实IP而不仅仅是localhost连接ES,则需要修改一下config下的elasticsearch.yml内容
安装成功后直接启动,访问http://localhost:9200和http://localhost:5601,分别出现以下界面表示成功
利用postman测试添加数据
返回结果
(1)下载跟据es版本下载对应ik并解压到如下目录下
(2)下载并启动nginx,在nginx下建自定义分词文件,目录如下
访问地址http://localhost/ElasticSearch/fenci.txt能看到数据
(3)在分词器插件里修改配置
(4)重启es,再次请求
POST _analyze
{
"analyzer": "ik_max_word",
"text": "艾瑞克电商项目"
}
可以看到自定义分词已经成功
org.elasticsearch.client elasticsearch-rest-high-level-client 7.13.4
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticSearchConfig {
@Bean
RestHighLevelClient esRestClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
return client;
}
}
如果需要安全验证,在每次请求时都带上请求头,就在上面的配置类里加上
,更多内容参考官方文档
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class GulimallElasticSearchConfig {
@Bean
RestHighLevelClient esRestClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
return client;
}
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// builder.addHeader("Authorization", "Bearer " + TOKEN);
// builder.setHttpAsyncResponseConsumerFactory(
// new HttpAsyncResponseConsumerFactory
// .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
}
@Test
void test() throws IOException {
IndexRequest indexRequest=new IndexRequest("user");
indexRequest.id("1");
String jsonStr="{"name":"Eric FRQ","age":23}";
indexRequest.source(jsonStr, XContentType.JSON);
IndexResponse index = client.index(indexRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
System.out.println(index);
}
(2)测试批量添加。更多内容参考官方文档
@Test
void testBulkAdd() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("user").id("2")
.source("{"name":"Stefan Zhou","age":53}",XContentType.JSON));
request.add(new IndexRequest("user").id("3")
.source("{"name":"Bulus Li","age":63}",XContentType.JSON));
request.add(new IndexRequest("user").id("4")
.source("{"name":"Jack Chen","age":55}",XContentType.JSON));
client.bulk(request, GulimallElasticSearchConfig.COMMON_OPTIONS);
}
(3)测试检索。更多内容参考官方文档
@Test
void testSearch() throws IOException {
SearchRequest searchRequest = new SearchRequest();
//索引
searchRequest.indices("user");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//searchSourceBuilder.query(QueryBuilders.matchQuery("name","Eric"));
//按照年龄聚合
TermsAggregationBuilder size = AggregationBuilders.terms("aggAgg").field("age").size(3);
searchSourceBuilder.aggregation(size);
//按照年龄平均值聚合
AvgAggregationBuilder field = AggregationBuilders.avg("balanceAvg").field("age");
searchSourceBuilder.aggregation(field);
searchRequest.source(searchSourceBuilder);
SearchResponse search = client.search(searchRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
System.out.println(search.toString());
}
六、
十三、ES常用查询语句总结
PUT /_cluster/settings
{
"persistent" : {
"action": {
"auto_create_index": "true"
}
}
}
# 按时间范围查询
post tomcat-logstash/_search
{
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "2021-02-01 00:40:39",
"lte": "2021-12-21 23:42:59",
"format": "yyyy-MM-dd HH:mm:ss",
"time_zone": "+08:00"
}
}
}
]
}
},
"size": 0,
"aggs": {
"groups": {
"terms": {
"field": "@timestamp",
"size":3,
"order" : { "_count" : "desc" }
}
}
}
}
# 按时间范围查询并将时间数据格式化yyyy-MM-dd HH:mm:ss
post /tomcat-logstash/_search
{
"query": {
"bool": {
"must": [
{
"range": {
"accessTime": {
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
]
}
},
"script_fields": {
"@timestamp": {
"script": {
"lang": "painless",
"inline": "doc['@timestamp'].value.toString('yyyy-MM-dd HH:mm:ss')"
}
}
}
}
# 将存在metricset.name.actual.free的数据按时间倒序查询出来
post /metricbeat-*/_search
{
"query": {
"exists": {
"field": "metricset.name.actual.free"
}
},
"from": 1,
"size": 2,
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}
# 将"metricset.name"等于 "memory"的数据按时间倒序查出来
post /metricbeat-*/_search
{
"query": {
"match_phrase": {
"metricset.name": "memory"
}
},
"from": 1,
"size": 1,
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}
# 将"system.filesystem.mount_point"等于"C"
# 并且"system.filesystem.mount_point"等于"D"
# 并且"metricset.name"等于"filesystem"的数据查出五条来
post /metricbeat-*/_search
{
"query": {
"bool": {
"should": [
{
"match_phrase": {
"system.filesystem.mount_point": "C"
}
},
{
"match_phrase": {
"metricset.name": "filesystem"
}
},
{
"match_phrase": {
"system.filesystem.mount_point": "D"
}
}
]
}
},
"from": 1,
"size": 5
}
# 在10条数据内,以system.filesystem.device_name.keyword分组
# 查询system.filesystem.total的数据
post /metricbeat-*/_search
{
"size": 0,
"query": {
"match_phrase": {
"metricset.name": "filesystem"
}
},
"aggs": {
"system.filesystem.device_name.keyword": {
"terms": {
"field": "system.filesystem.total",
"size": 10,
"order": {
"_count": "asc"
}
}
}
}
}
十四、ELK环境搭建
①、logstash采集tomcat或springboot工程log
1、官网下载、解压、使用。我这里使用的都是7.13.0版本。
1、elasticsearch(存放数据)、
2、metricbeat(监控服务器cpu、内存等)、
3、kibana(界面化工具,对es的操作等)、
4、Logstash(日志采集)、
5、elasticsearch-head(数据展示,比kibana更直观)
6、elasticsearch-curator(es数据索引管理工具,用于定期清理es索引数据等)
2、按序启动
1、es,在ELKelasticsearch-7.13.0bin下双击elasticsearch.bat
2、kibana,在ELKkibana-7.13.0-windows-x86_64bin下双击kibana.bat
3、logstash,在ELKLogstashbin下新建logstash.conf,粘贴如下内容。
在Logstashbin下cmd输入命令logstash -f logstash.conf
说明:input.file.path=项目的日志文件路径,比如tomcat日志或者springboot输出到文件的日志
logstash.conf内容如下
如果生成固定es的索引,参考下面配置
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
file{
path =>"E:/Work/2021/target/logs/access_log.*.log"
type => "tomcat_access_log"
start_position=>"beginning"
}
}
input {
beats {
port => "5044"
type => "metricbeat"
}
}
filter{
grok{
match=>{
"message"=>"%{data:ip} - - [%{HTTPDATE:accessTime}] "%{data:method} %{data:access} %{data:httpversion}" %{data:retcode} %{data:flow} %{data:retTime} "%{data:fromHtml}" "%{data:useragent}""
}
remove_field=>"message"
remove_field=> "path"
}
date{
match=>["accessTime","yyyy-MM-dd-HH:mm:ss"]
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
mutate {
remove_field => ["@timestamp","ecs"]
}
}
output {
if "tomcat_access_log" in [type] {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "tomcat-logstash"
}
}
if "metricbeat" in [type] {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "metricbeat-logstash"
}
}
}
如果要生成es带时间戳的索引,参考下面配置:(如果要索引后面加时间戳,一定要有@timestamp字段,所以注释掉了删除此字段的配置)
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
file{
path =>"E:/Work/2021/target/logs/access_log.*.log"
type => "tomcat_access_log"
start_position=>"beginning"
}
}
input {
beats {
port => "5044"
type => "metricbeat"
}
}
filter{
grok{
match=>{
"message"=>"%{data:ip} - - [%{HTTPDATE:accessTime}] "%{data:method} %{data:access} %{data:httpversion}" %{data:retcode} %{data:flow} %{data:retTime} "%{data:fromHtml}" "%{data:useragent}""
}
remove_field=>"message"
remove_field=> "path"
}
date{
match=>["accessTime","yyyy-MM-dd-HH:mm:ss"]
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
#mutate {
#remove_field => ["@timestamp","ecs"]
#}
}
output {
if "tomcat_access_log" in [type] {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "tomcat-logstash-%{+YYYY.MM.dd}"
}
}
if "metricbeat" in [type] {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "metricbeat-logstash-%{+YYYY.MM.dd}"
}
}
}
②、metricbeat监控服务器,采集服务器数据
1、metricbeat,在上面三个都启动好了之后,编辑metricbeat.yml,搜索关键字Output,关闭直接输出到es的配置,如下图
2、搜索关键字Logstash Output,打开输出到logstash,使用端口5044,和上面的logstash.conf5044对应上,如下图
③启动elasticsearch-head,访问http://localhost:8080/elasticsearch-head/3、在ELKmetricbeat-7.13.0-windows-x86_64下cmd输入 metricbeat.exe -e
1、将elasticsearch-head丢到tomcat的webapp下,直接启动tomcat即可
2、开启es动态创建索引,这样logstash就能跟据配置动态创建索引了
PUT /_cluster/settings
{
"persistent" : {
"action": {
"auto_create_index": "true"
}
}
}
访问路径后页面如下:
1、下载elasticsearch-curator
官网地址:https://packages.elastic.co/curator/5/windows/elasticsearch-curator-5.8.4-amd64.msi
2、安装。安装目录自选(说明:此安装其实就相当于一个解压过程,安装后的文件夹随意拷贝到其他服务器就能直接用),安装成功后是个文件夹elasticsearch-curator,然后手动创建下面两个文件config.yml、action.yml
3、配置
官网config,yml配置如下,无特殊需求,直接粘贴即可使用
client:
hosts:
- 127.0.0.1
port: 9200
url_prefix:
use_ssl: False
certificate:
client_cert:
client_key:
ssl_no_validate: False
http_auth:
timeout: 30
master_only: False
logging:
loglevel: INFO
logfile:
logformat: default
blacklist: ['elasticsearch', 'urllib3']
配置action.yml,内容如下; 其中:
actions下的1、2代表多个事件动作,delete_indices代表删除索引事件,其他事件比如关闭索引、合并索引等事件参考官网 ==>action配置说明
description:此事件的描述
options:continue_if_exception遇到异常是否继续
filters:配置删除什么样子的索引,- filtertype:pattern的索引名称模型,kind: prefix索引的前缀,
value: tomcat-logstash-前缀值是什么, - filtertype: age过期时间设置,
timestring: '%Y.%m.%d'索引前缀后面的日期格式,unit: days过期时间的单位,unit_count: 1过期时间的值
actions:
1:
action: delete_indices
description: >-
Close indices older than 1days (based on index name), forlogstash-
prefixed indices.
options:
continue_if_exception: False
ignore_empty_list: True
disable_action: False
filters:
- filtertype: pattern
kind: prefix
value: metricbeat-logstash-
- filtertype: age
source: name
direction: older
timestring: '%Y.%m.%d'
unit: days
unit_count: 1
2:
action: delete_indices
description: >-
Close indices older than 7days (based on index name), forlogstash-
prefixed indices.
options:
continue_if_exception: False
ignore_empty_list: True
disable_action: False
filters:
- filtertype: pattern
kind: prefix
value: tomcat-logstash-
- filtertype: age
source: name
direction: older
timestring: '%Y.%m.%d'
unit: days
unit_count: 7
4、启动命令:curator --config config.yml action.yml
运行结果:两个事件动作都完成!
十五、logstash采集数据延迟八小时5、结果验证:可以看到设置删除时间为删除一天前的,我的索引metricbeat-logstash-2021.09.27已经被删除了
设置为七天前的,tomcat-logstash-2021.09.27、tomcat-logstash-2021.09.28两个都还在
场景:
通过metricbeat收集服务器系统日志,metricbeat中的日志发送到kafka中
Logstash中的时间为格林威治时间,因此通过logstash采集到的数据和我们的时间会有8小时的时差
如果在后续代码中处理很有可能会处理遗漏掉,造成数据的时间错误。
版本
logstash 7.6.0
解决方案如下
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
}
}
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*3600)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "springboot-logstash-%{+YYYY.MM.dd}"
}
}
logstash生成文件名中的日期是从@timestamp字段的值中获取,通过设置filter将timestamp中的时间转换成系统时间,问题解决