在K8s上使用Fluentd搭建日志收集系统

目前在K8s中比较流行的日志收集方案主要是ElasticsearchFluentdKibana(EFK)技术栈,也是官方文档中推荐的方案。

我司已经搭建好Log->Kafka->Elasticsearch->Kibana整套流程,我们只需要使用Fluentd收集K8s集群中日志,然后发送到Kafka topic中即可。

一、技术选型

Fluentd是一个使用Ruby编写的,开源的数据收集器,可以统一对数据收集和消费,以便更好地使用和理解数据。Fluentd相比Logstash,占用较少的内存,提供更高效的日志功能性能。Fluentd-bit更轻量级,占用资源更少,但是整体还是Fluentd拥有更多的插件,更成熟的生态,相对较少的资源占用,同时能满足我们对日志收集系统的要求。

二、工作原理

我们将Fluentd部署为DaemonSet,它会在K8s集群中每个节点上生成一个pod,监听节点上所有的容器日志文件。读取文件,进行数据过滤&数据&加工,然后将数据发送到Kafka。

三、输入输出配置

1、日志源配置

我们的使用场景是通过Fluentd监听docker容器的日志,配置如下:

1
2
3
4
5
6
7
8
9
10
11
<source>
@type tail
path /var/log/containers/*.log # 挂载的服务器Docker容器日志地址
exclude_path ["/var/log/containers/kube-*.log", "/var/log/containers/test-*.log", "/var/log/containers/calico-*.log", "/var/log/containers/heapster-*.log", "/var/log/containers/etcd-*.log", "/var/log/containers/kubernetes-*.log", "/var/log/containers/fluentd-*.log"] # 排除的日志文件
follow_inodes true # 如果没有这个参数,文件轮转会导致日志重复
pos_file /var/log/es-containers.log.pos # 保存已读日志文件的位置
tag raw.kubernetes.* # 日志标签
format json # JSON解析器
time_key time # 指定事件时间的时间字段
time_format %Y-%m-%dT%H:%M:%S.%NZ # 时间格式
</source>

Fluentd支持多种数据源,比如文件、UDP、TCP等等。

2、输出配置

我们将格式化好的日志输出到Kafka,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<match **>             											  # 匹配所有数据都输出到kfaka
@id kafka # 标识符,非必需
@type kafka2 # 输出插件的类型
@log_level debug # 捕获日志的级别,这里为了查看发送的性能,设置为debug
# list of seed brokers
brokers brokers # brokers地址
topic_key topic_key # topic_key
default_topic topic_key # topic_key
use_event_time true # 将Fluentd时间设置为Kafka的CreateTime
# buffer settings
<buffer> # Fluentd可以使用缓存来提高性能
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer # 缓存文件路径

flush_mode interval # 刷新类型
flush_thread_count 16 # 刷新线程个数
flush_interval 10s # 刷新间隔
retry_max_interval 30 # 失败重试的最长间隔

chunk_limit_size 256M # 单个缓存文件的最大大小
queue_limit_length 16 # 最多缓存文件个数
overflow_action drop_oldest_chunk # 当缓存队列已满的拒绝策略
</buffer>
# data type settings
<format>
@type json # JSON解析器
</format>
required_acks -1 # 请求是否需要确认
compression_codec gzip # 生产者用于压缩消息的解编码器
</match>

Fluentd支持多种输出插件,比如文件、Elasticsearch、Mongo等等。缓冲区也有更多的配置,详见Buffer Section

四、数据处理配置

以上便是最基本的数据输入和输出配置,我们还需要对数据进行一些处理,比如日志中添加pod的信息、将异常多行日志合并成一行等等。

1、异常检测器插件

1
2
3
4
5
6
7
<match raw.kubernetes.**> 
@type detect_exceptions
remove_tag_prefix raw # 去掉已经处理过的日志tag的前缀,防止死循环
message log # 扫描JOSN格式日志的字段名
languages all # 检测异常堆栈跟踪的语言列表
multiline_flush_interval 0.1 # 应转发(可能尚未完成)缓冲的异常堆栈的时间间隔
</match>

detect-exceptions可以自动检测Java, JavaScript, C#, Python, Go, Ruby, Php语言的多行异常堆栈信息,并且将多行日志合并成一行,其他配置详见Github

2、多行日志链接插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<filter raw.**>
@type concat
key log # 扫描JOSN格式日志的字段名
stream_identity_key container_id # 确定事件属于哪个流
multiline_start_regexp /^[\[ ]*\d{1,4}[-\/ ]+\d{1,2}[-\/ ]+\d{1,4}[ T]+\d{1,2}:\d{1,2}:\d{1,2}/ # 匹配多行开头的正则表达式
separator "" # 多行的分隔符
flush_interval 10 # 刷新最后收到的事件日志之前的秒数
timeout_label @NORMAL # 如果事件超时,打上NORMAL标签
</filter>

# 像处理普通日志一样处理超时日志行
<match **>
@type relabel
@label @NORMAL
</match>

concat-plugin可以自动连接多个事件中分离的多行日志。例如config输出等。原理是通过正则表达式去匹配多行开头,所以打印日志需要按照上面的正则表达式时间格式。

3、增加K8s metadata

1
2
3
<filter kubernetes.**>
@type kubernetes_metadata
</filter>

可以对每一行日志添加k8s metadata数据,添加的字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"docker": {
"container_id": ""
},
"kubernetes": {
"container_name": "",
"namespace_name": "",
"pod_name": "",
"container_image": "",
"container_image_id": "",
"pod_id": "",
"pod_ip": "",
"host": "",
"labels": {
"pod-template-hash": "",
"k8s_kuboard_cn/layer": "",
"k8s_kuboard_cn/name": ""
},
"master_url": "",
"namespace_id": "",
"namespace_labels": {
"alertmanager": "",
"kubernetes_io/metadata_name": ""
}
}
}

4、数据处理

1
2
3
4
5
6
7
8
9
10
<filter kubernetes.**>
@type record_transformer
enable_ruby true # 使用${record[""]}启用Ruby语法
<record>
message ${record["log"]} # 将log字段重命名为message
# 这里也可以添加字段,比如添加source:test
source test
</record>
remove_keys $.log,$.docker,$.stream,$.kubernetes.container_image_id,$.kubernetes.container_image,$.kubernetes.pod_ip,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels,$.kubernetes.namespace_labels # 删除多余的字段
</filter>

我们可以对日志修改字段名,添加字段和删除字段。我们删除了kubernetes_metadata中一些敏感信息。其他操作详见record_transformer

五、配置详情

多行日志链接插件中我们使用relabel给所有日志都打上了NORMAL标签,所以下方对数据进行处理的时候也需要加上这个参数。详细配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: kube-system
data:

fluent.conf: |
<source>
@type tail
path /var/log/containers/*.log # 挂载的服务器Docker容器日志地址
exclude_path ["/var/log/containers/kube-*.log", "/var/log/containers/test-*.log", "/var/log/containers/calico-*.log", "/var/log/containers/heapster-*.log", "/var/log/containers/etcd-*.log", "/var/log/containers/kubernetes-*.log", "/var/log/containers/fluentd-*.log"] # 排除的日志文件
follow_inodes true # 如果没有这个参数,文件轮转会导致日志重复
pos_file /var/log/es-containers.log.pos # 保存已读日志文件的位置
tag raw.kubernetes.* # 日志标签
format json # JSON解析器
time_key time # 指定事件时间的时间字段
time_format %Y-%m-%dT%H:%M:%S.%NZ # 时间格式
</source>

<match raw.kubernetes.**>
@type detect_exceptions
remove_tag_prefix raw # 去掉已经处理过的日志tag的前缀,防止死循环
message log # 扫描JOSN格式日志的字段名
languages all # 检测异常堆栈跟踪的语言列表
multiline_flush_interval 0.1 # 应转发(可能尚未完成)缓冲的异常堆栈的时间间隔
</match>

<filter raw.**>
@type concat
key log # 扫描JOSN格式日志的字段名
stream_identity_key container_id # 确定事件属于哪个流
multiline_start_regexp /^[\[ ]*\d{1,4}[-\/ ]+\d{1,2}[-\/ ]+\d{1,4}[ T]+\d{1,2}:\d{1,2}:\d{1,2}/ # 匹配多行开头的正则表达式
separator "" # 多行的分隔符
flush_interval 10 # 刷新最后收到的事件日志之前的秒数
timeout_label @NORMAL # 如果事件超时,打上NORMAL标签
</filter>

# 像处理普通日志一样处理超时日志行
<match **>
@type relabel
@label @NORMAL
</match>

<label @NORMAL>
<filter kubernetes.**>
@type kubernetes_metadata
</filter>

<filter kubernetes.**>
@type record_transformer
enable_ruby true # 使用${record[""]}启用Ruby语法
<record>
message ${record["log"]} # 将log字段重命名为message
# 这里也可以添加字段,比如添加source:test
source test
</record>
# 删除多余的字段
remove_keys $.log,$.docker,$.stream,$.kubernetes.container_image_id,$.kubernetes.container_image,$.kubernetes.pod_ip,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels,$.kubernetes.namespace_labels
</filter>

<match **> # 匹配所有数据都输出到kfaka
@id kafka # 标识符,非必需
@type kafka2 # 输出插件的类型
@log_level debug # 捕获日志的级别,这里为了查看发送的性能,设置为debug
# list of seed brokers
brokers brokers # brokers地址
topic_key topic_key # topic_key
default_topic topic_key # topic_key
use_event_time true # 将Fluentd时间设置为Kafka的CreateTime
# buffer settings
<buffer> # Fluentd可以使用缓存来提高性能
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer # 缓存文件路径

flush_mode interval # 刷新类型
flush_thread_count 16 # 刷新线程个数
flush_interval 10s # 刷新间隔
retry_max_interval 30 # 失败重试的最长间隔

chunk_limit_size 256M # 单个缓存文件的最大大小
queue_limit_length 16 # 最多缓存文件个数
overflow_action drop_oldest_chunk # 当缓存队列已满的拒绝策略
</buffer>
# data type settings
<format>
@type json # JSON解析器
</format>
required_acks -1 # 请求是否需要确认
compression_codec gzip # 生产者用于压缩消息的解编码器
</match>
</label>

Fluentd拥有非常多的插件,可以提供更多丰富的功能,由于我们使用场景有限,所以只使用了其中很少一部分功能就能满足我们的需求了,更详细的Fluentd使用说明详见官方文档

六、部署

以上的yaml是Fluentd的配置文件,我们将以DaemonSet的形式部署Fluentd。部署文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd
namespace: kube-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluentd
namespace: kube-system
rules:
- apiGroups:
- ""
resources:
- pods
- namespaces
verbs:
- get
- list
- watch

---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd
namespace: default
roleRef:
kind: ClusterRole
name: fluentd
apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
name: fluentd
namespace: kube-system
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: kube-system
labels:
k8s-app: fluentd-logging
version: v1
spec:
selector:
matchLabels:
k8s-app: fluentd-logging
version: v1
template:
metadata:
labels:
k8s-app: fluentd-logging
version: v1
spec:
serviceAccount: fluentd
serviceAccountName: fluentd
# 去掉下方注释就在master上启动一个pod
# tolerations:
# - key: node-role.kubernetes.io/master
# effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.14.2-debian-kafka2-1.1
env:
- name: FLUENT_UID
value: "0"
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
resources:
limits:
cpu: 1000m
memory: 800Mi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: data1
mountPath: /disk1/containers
readOnly: true
- name: fluentd-config
mountPath: /fluentd/etc
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: data1
hostPath:
path: /disk1/containers
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: fluentd-config
configMap:
name: fluentd-config

我们将Fluentd部署在namespace: kube-system下,会新建一个Fluentd的DaemonSet,里面会对每一台物理机(除了Master)生成一个Pod,监听节点上所有的容器日志文件。

七、测试

目前,Fluentd已经在k8s集群中启动起来了,并会将日志处理并推送到Kafka中。我司已经完成了Kafka->Elasticsearch->Kibana的整个步骤,我们在Kibana中就可以看到日志信息了。如果是自己搭建EFK整个流程的,需要参考一下其他教程。

同时需要测试一下我们的插件使用情况,比如Exception是否合并成一条日志、多行配置信息的打印是否合并成一条日志、我们添加删除的字段是否生效。

八、总结

至此,我们的日志收集系统已经搭建完毕,经过测试后可以平稳运行,不会出现丢失数据的情况。

平台对日志收集系统性能有所要求,于是我们在设置Fulentd的最大资源是800M内存,1核CPU进行压测,1秒中能处理并推送大约1500条日志,性能较差,下文我们对会Fluentd进行性能调优。


评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×