目前在K8s中比较流行的日志收集方案主要是Elasticsearch
、Fluentd
和 Kibana
(EFK)技术栈,也是官方文档中推荐的方案。
我司已经搭建好Log->Kafka->Elasticsearch->Kibana整套流程,我们只需要使用Fluentd收集K8s集群中日志,然后发送到Kafka topic中即可。
一、技术选型
Fluentd是一个使用Ruby编写的,开源的数据收集器,可以统一对数据收集和消费,以便更好地使用和理解数据。Fluentd相比Logstash,占用较少的内存,提供更高效的日志功能性能。Fluentd-bit更轻量级,占用资源更少,但是整体还是Fluentd拥有更多的插件,更成熟的生态,相对较少的资源占用,同时能满足我们对日志收集系统的要求。
二、工作原理
我们将Fluentd部署为DaemonSet,它会在K8s集群中每个节点上生成一个pod,监听节点上所有的容器日志文件。读取文件,进行数据过滤&数据&加工,然后将数据发送到Kafka。
三、输入输出配置
1、日志源配置
我们的使用场景是通过Fluentd监听docker容器的日志,配置如下:
1 2 3 4 5 6 7 8 9 10 11
| <source> @type tail path /var/log/containers/*.log exclude_path ["/var/log/containers/kube-*.log", "/var/log/containers/test-*.log", "/var/log/containers/calico-*.log", "/var/log/containers/heapster-*.log", "/var/log/containers/etcd-*.log", "/var/log/containers/kubernetes-*.log", "/var/log/containers/fluentd-*.log"] follow_inodes true pos_file /var/log/es-containers.log.pos tag raw.kubernetes.* format json time_key time time_format %Y-%m-%dT%H:%M:%S.%NZ </source>
|
Fluentd支持多种数据源,比如文件、UDP、TCP等等。
2、输出配置
我们将格式化好的日志输出到Kafka,配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| <match **> @id kafka @type kafka2 @log_level debug brokers brokers topic_key topic_key default_topic topic_key use_event_time true <buffer> @type file path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval flush_thread_count 16 flush_interval 10s retry_max_interval 30
chunk_limit_size 256M queue_limit_length 16 overflow_action drop_oldest_chunk </buffer> <format> @type json </format> required_acks -1 compression_codec gzip </match>
|
Fluentd支持多种输出插件,比如文件、Elasticsearch、Mongo等等。缓冲区也有更多的配置,详见Buffer Section。
四、数据处理配置
以上便是最基本的数据输入和输出配置,我们还需要对数据进行一些处理,比如日志中添加pod的信息、将异常多行日志合并成一行等等。
1、异常检测器插件
1 2 3 4 5 6 7
| <match raw.kubernetes.**> @type detect_exceptions remove_tag_prefix raw message log languages all multiline_flush_interval 0.1 </match>
|
detect-exceptions可以自动检测Java, JavaScript, C#, Python, Go, Ruby, Php语言的多行异常堆栈信息,并且将多行日志合并成一行,其他配置详见Github。
2、多行日志链接插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <filter raw.**> @type concat key log stream_identity_key container_id multiline_start_regexp /^[\[ ]*\d{1,4}[-\/ ]+\d{1,2}[-\/ ]+\d{1,4}[ T]+\d{1,2}:\d{1,2}:\d{1,2}/ separator "" flush_interval 10 timeout_label @NORMAL </filter>
<match **> @type relabel @label @NORMAL </match>
|
concat-plugin可以自动连接多个事件中分离的多行日志。例如config输出等。原理是通过正则表达式去匹配多行开头,所以打印日志需要按照上面的正则表达式时间格式。
1 2 3
| <filter kubernetes.**> @type kubernetes_metadata </filter>
|
可以对每一行日志添加k8s metadata数据,添加的字段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| { "docker": { "container_id": "" }, "kubernetes": { "container_name": "", "namespace_name": "", "pod_name": "", "container_image": "", "container_image_id": "", "pod_id": "", "pod_ip": "", "host": "", "labels": { "pod-template-hash": "", "k8s_kuboard_cn/layer": "", "k8s_kuboard_cn/name": "" }, "master_url": "", "namespace_id": "", "namespace_labels": { "alertmanager": "", "kubernetes_io/metadata_name": "" } } }
|
4、数据处理
1 2 3 4 5 6 7 8 9 10
| <filter kubernetes.**> @type record_transformer enable_ruby true <record> message ${record["log"]} source test </record> remove_keys $.log,$.docker,$.stream,$.kubernetes.container_image_id,$.kubernetes.container_image,$.kubernetes.pod_ip,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels,$.kubernetes.namespace_labels </filter>
|
我们可以对日志修改字段名,添加字段和删除字段。我们删除了kubernetes_metadata中一些敏感信息。其他操作详见record_transformer。
五、配置详情
多行日志链接插件中我们使用relabel给所有日志都打上了NORMAL标签,所以下方对数据进行处理的时候也需要加上这个参数。详细配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| apiVersion: v1 kind: ConfigMap metadata: name: fluentd-config namespace: kube-system data: fluent.conf: | <source> @type tail path /var/log/containers/*.log # 挂载的服务器Docker容器日志地址 exclude_path ["/var/log/containers/kube-*.log", "/var/log/containers/test-*.log", "/var/log/containers/calico-*.log", "/var/log/containers/heapster-*.log", "/var/log/containers/etcd-*.log", "/var/log/containers/kubernetes-*.log", "/var/log/containers/fluentd-*.log"] # 排除的日志文件 follow_inodes true # 如果没有这个参数,文件轮转会导致日志重复 pos_file /var/log/es-containers.log.pos # 保存已读日志文件的位置 tag raw.kubernetes.* # 日志标签 format json # JSON解析器 time_key time # 指定事件时间的时间字段 time_format %Y-%m-%dT%H:%M:%S.%NZ # 时间格式 </source>
<match raw.kubernetes.**> @type detect_exceptions remove_tag_prefix raw # 去掉已经处理过的日志tag的前缀,防止死循环 message log # 扫描JOSN格式日志的字段名 languages all # 检测异常堆栈跟踪的语言列表 multiline_flush_interval 0.1 # 应转发(可能尚未完成)缓冲的异常堆栈的时间间隔 </match>
<filter raw.**> @type concat key log # 扫描JOSN格式日志的字段名 stream_identity_key container_id # 确定事件属于哪个流 multiline_start_regexp /^[\[ ]*\d{1,4}[-\/ ]+\d{1,2}[-\/ ]+\d{1,4}[ T]+\d{1,2}:\d{1,2}:\d{1,2}/ # 匹配多行开头的正则表达式 separator "" # 多行的分隔符 flush_interval 10 # 刷新最后收到的事件日志之前的秒数 timeout_label @NORMAL # 如果事件超时,打上NORMAL标签 </filter>
# 像处理普通日志一样处理超时日志行 <match **> @type relabel @label @NORMAL </match>
<label @NORMAL> <filter kubernetes.**> @type kubernetes_metadata </filter>
<filter kubernetes.**> @type record_transformer enable_ruby true # 使用${record[""]}启用Ruby语法 <record> message ${record["log"]} # 将log字段重命名为message # 这里也可以添加字段,比如添加source:test source test </record> # 删除多余的字段 remove_keys $.log,$.docker,$.stream,$.kubernetes.container_image_id,$.kubernetes.container_image,$.kubernetes.pod_ip,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels,$.kubernetes.namespace_labels </filter>
<match **> # 匹配所有数据都输出到kfaka @id kafka # 标识符,非必需 @type kafka2 # 输出插件的类型 @log_level debug # 捕获日志的级别,这里为了查看发送的性能,设置为debug # list of seed brokers brokers brokers # brokers地址 topic_key topic_key # topic_key default_topic topic_key # topic_key use_event_time true # 将Fluentd时间设置为Kafka的CreateTime # buffer settings <buffer> # Fluentd可以使用缓存来提高性能 @type file path /var/log/fluentd-buffers/kubernetes.system.buffer # 缓存文件路径
flush_mode interval # 刷新类型 flush_thread_count 16 # 刷新线程个数 flush_interval 10s # 刷新间隔 retry_max_interval 30 # 失败重试的最长间隔
chunk_limit_size 256M # 单个缓存文件的最大大小 queue_limit_length 16 # 最多缓存文件个数 overflow_action drop_oldest_chunk # 当缓存队列已满的拒绝策略 </buffer> # data type settings <format> @type json # JSON解析器 </format> required_acks -1 # 请求是否需要确认 compression_codec gzip # 生产者用于压缩消息的解编码器 </match> </label>
|
Fluentd拥有非常多的插件,可以提供更多丰富的功能,由于我们使用场景有限,所以只使用了其中很少一部分功能就能满足我们的需求了,更详细的Fluentd使用说明详见官方文档。
六、部署
以上的yaml是Fluentd的配置文件,我们将以DaemonSet的形式部署Fluentd。部署文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| --- apiVersion: v1 kind: ServiceAccount metadata: name: fluentd namespace: kube-system
--- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: fluentd namespace: kube-system rules: - apiGroups: - "" resources: - pods - namespaces verbs: - get - list - watch
--- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fluentd namespace: default roleRef: kind: ClusterRole name: fluentd apiGroup: rbac.authorization.k8s.io subjects: - kind: ServiceAccount name: fluentd namespace: kube-system --- apiVersion: apps/v1 kind: DaemonSet metadata: name: fluentd namespace: kube-system labels: k8s-app: fluentd-logging version: v1 spec: selector: matchLabels: k8s-app: fluentd-logging version: v1 template: metadata: labels: k8s-app: fluentd-logging version: v1 spec: serviceAccount: fluentd serviceAccountName: fluentd containers: - name: fluentd image: fluent/fluentd-kubernetes-daemonset:v1.14.2-debian-kafka2-1.1 env: - name: FLUENT_UID value: "0" - name: K8S_NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName resources: limits: cpu: 1000m memory: 800Mi requests: cpu: 100m memory: 100Mi volumeMounts: - name: varlog mountPath: /var/log - name: varlibdockercontainers mountPath: /var/lib/docker/containers readOnly: true - name: data1 mountPath: /disk1/containers readOnly: true - name: fluentd-config mountPath: /fluentd/etc terminationGracePeriodSeconds: 30 volumes: - name: varlog hostPath: path: /var/log - name: data1 hostPath: path: /disk1/containers - name: varlibdockercontainers hostPath: path: /var/lib/docker/containers - name: fluentd-config configMap: name: fluentd-config
|
我们将Fluentd部署在namespace: kube-system下,会新建一个Fluentd的DaemonSet,里面会对每一台物理机(除了Master)生成一个Pod,监听节点上所有的容器日志文件。
七、测试
目前,Fluentd已经在k8s集群中启动起来了,并会将日志处理并推送到Kafka中。我司已经完成了Kafka->Elasticsearch->Kibana的整个步骤,我们在Kibana中就可以看到日志信息了。如果是自己搭建EFK整个流程的,需要参考一下其他教程。
同时需要测试一下我们的插件使用情况,比如Exception是否合并成一条日志、多行配置信息的打印是否合并成一条日志、我们添加删除的字段是否生效。
八、总结
至此,我们的日志收集系统已经搭建完毕,经过测试后可以平稳运行,不会出现丢失数据的情况。
平台对日志收集系统性能有所要求,于是我们在设置Fulentd的最大资源是800M内存,1核CPU进行压测,1秒中能处理并推送大约1500条日志,性能较差,下文我们对会Fluentd进行性能调优。