
启动elasticsearch跟kibana(可选,但是启动起来能看es里面的数据,方便调试)
先写好vector解析toml,新版本的vector可以使用Vector Remap Language来添加字段、删除字段,还有一些内置函数,变量等,挺好用的。
[sources.airflow_log]
type = "file"
ignore_older_secs = 86400
include = [ "/home/greetlist/airflow/logs*/*.log" ]
read_from = "beginning"
data_dir = "."
[transforms.transform_get_unique_id]
type = "remap"
inputs = [ "airflow_log" ]
source = """
. |= parse_regex!(.file, r'/home/greetlist/airflow/logs/(?P.*)/(?P.*)/(?P.*)/(?P.*).log$')
"""
[transforms.transform_remove_file_field]
type = "remap"
inputs = [ "transform_get_unique_id" ]
source = """
del(.file)
del(.host)
"""
[transforms.transform_add_log_id_field]
type = "remap"
inputs = [ "transform_remove_file_field" ]
source = """
.log_id = join!([.dag_id, .task_id, .run_id, .try_number], "-")
.offset = 1
"""
[sinks.airflow_log_sink]
type = "console"
inputs = [ "transform_add_log_id_field" ]
target = "stdout"
encoding.codec = "json"
[sinks.to_elasticsearch]
type = "elasticsearch"
inputs = [ "transform_add_log_id_field" ]
endpoint = "http://127.0.0.1:9200"
index = ".log_id"
mode = "data_stream"
#pipeline = "pipeline-name"
compression = "none"
上面配置文件需要注意几点 :
{dag_id}-{task_id}-{run_id}-{try_number}
我们需要添加log_id这个字段到json里面
vector -c vector.toml
这个就很恶心,vector里面可能读取一行的时候是以n或者nr这种来取的,但是这边的python代码在日志结束的时候不写换行符,就可能会导致vector一直卡在读取行,而不把end_of_log日志结尾标志写进es里面。进而在airflow web那边看日志的时候,就会一直有load组件(转圈圈)在界面上消不掉。
[logging]
remote_logging = True
[elasticsearch]
# Elasticsearch host
host = http://localhost:9200
# Format of the log_id, which is used to query for a given tasks logs
log_id_template = {dag_id}-{task_id}-{run_id}-{try_number}
注意:
修改完代码之后就可以重启scheduler跟webserver了。
完工