kubernetes提供了对CPU,内存的限制,可以防止应用无限制的使用系统的资源。
kubernetes在1.8版本引入了一种新的resource:local ephemeral storage(临时存储),用来管理本地临时存储,对应特性 LocalStorageCapacityIsolation。
之后的版本就默认开启。
Pod的每个container都可以配置:
spec.containers[].resources.limits.ephemeral-storage
spec.containers[].resources.requests.ephemeral-storage
若发生因配置了临时存储限制,容器文件不断增长,超过阈值导致pod被驱逐,describe pod可以看到类似下面的event信息:
Warning Evicted 1s kubelet Pod ephemeral local storage usage exceeds the total limit of containers 100Mi.
如果没有特殊限制,我们不会配置ephemeral-storage的request和limit,但是如果你的pod存储会持续增长,但是又不想影响到节点其他容器运行,就可以设置下,这样当容器的存储达到limit设置大小后,只有这一个pod就会驱逐重建,而不是整个节点上所有pod都被驱逐。
Evict Pod动作是由kubelet完成的。每个节点上的kubelet会启动一个evict manager,每隔一段固定的时间进行一次检查,ephemeral storage的检查也是在这个阶段完成的。
// synchronize is the main control loop that enforces eviction thresholds.
// Returns the pod that was killed, or nil if no pod was killed.
// synchronize 方法是超过阈值强制驱逐的主循环,返回值是kill掉的pod列表
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
// ...
// evict pods if there is a resource usage violation from local volume temporary storage
// If eviction happens in localStorageEviction function, skip the rest of eviction action
// 如果 是否可以支持本地存储容量隔离选项为true,则如果本地临时存储中存在资源使用冲突,则逐出 Pod
if m.localStorageCapacityIsolation {
if evictedPods := m.localStorageEviction(activePods, statsFunc); len(evictedPods) > 0 {
return evictedPods
}
}
// ...
}
// localStorageEviction checks the EmptyDir volume usage for each pod and determine whether it exceeds the specified limit and needs
// to be evicted. It also checks every container in the pod, if the container overlay usage exceeds the limit, the pod will be evicted too.
// localStorageEviction方法对pod列表中的pod逐一处理;用statsFunc获取pod的实际状态,然后做三个检查
// 是否满足emptyDirLimit,是否满足podEphemeralStorageLimit,是否满足containerEphemeralStorageLimit
// 若有一项不满足,则加入to-be-evicted pod列表,该列表作为方法的返回值
func (m *managerImpl) localStorageEviction(pods []*v1.Pod, statsFunc statsFunc) []*v1.Pod {
evicted := []*v1.Pod{}
for _, pod := range pods {
// statsFunc方法的作用:获取pod的pod ID,查询对应pod的实际状态
podStats, ok := statsFunc(pod)
if !ok {
continue
}
// 检查该pod实际emptyDir状态是否超过了emptyDir Limit
if m.emptyDirLimitEviction(podStats, pod) {
evicted = append(evicted, pod)
continue
}
// 检查该pod实际podEphemeralStorage状态是否超过了podEphemeralStorage Limit
if m.podEphemeralStorageLimitEviction(podStats, pod) {
evicted = append(evicted, pod)
continue
}
// 检查该pod实际containerEphemeralStorage状态是否超过了containerEphemeralStorage Limit
if m.containerEphemeralStorageLimitEviction(podStats, pod) {
evicted = append(evicted, pod)
}
}
return evicted
}
入参 statsFunc statsFunc 相关代码:
observations, statsFunc := makeSignalObservations(summary)
statsFunc := cachedStatsFunc(summary.Pods)
// cachedStatsFunc returns a statsFunc based on the provided pod stats.
// cachedStatsFunc 入参是 PodStats列表,返回值是 statsFunc函数,该函数入参是*v1.Pod,返回值是pod状态和是否找到该pod
// cachedStatsFunc 在返回前做了个操作:将入参的PodStats列表转成了map,key是podStats[i].PodRef.UID,值是podStats列表元素,
// 为了方便返回的statsFunc索引pod
func cachedStatsFunc(podStats []statsapi.PodStats) statsFunc {
uid2PodStats := map[string]statsapi.PodStats{}
for i := range podStats {
uid2PodStats[podStats[i].PodRef.UID] = podStats[i]
}
return func(pod *v1.Pod) (statsapi.PodStats, bool) {
stats, found := uid2PodStats[string(pod.UID)]
return stats, found
}
}
localStorageEviction函数的三项检查:
func (m *managerImpl) emptyDirLimitEviction(podStats statsapi.PodStats, pod *v1.Pod) bool {
podVolumeUsed := make(map[string]*resource.Quantity)
for _, volume := range podStats.VolumeStats {
podVolumeUsed[volume.Name] = resource.NewQuantity(int64(*volume.UsedBytes), resource.BinarySI)
}
for i := range pod.Spec.Volumes {
source := &pod.Spec.Volumes[i].VolumeSource
// 一旦检查到有配置 pod.Spec.Volumes[i].VolumeSource,则比较 实际状态值和limit值
if source.EmptyDir != nil {
size := source.EmptyDir.SizeLimit
used := podVolumeUsed[pod.Spec.Volumes[i].Name]
if used != nil && size != nil && size.Sign() == 1 && used.Cmp(*size) > 0 {
// the emptyDir usage exceeds the size limit, evict the pod
if m.evictPod(pod, 0, fmt.Sprintf(emptyDirMessageFmt, pod.Spec.Volumes[i].Name, size.String()), nil, nil) {
metrics.Evictions.WithLabelValues(signalEmptyDirFsLimit).Inc()
return true
}
return false
}
}
}
return false
}
func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStats, pod *v1.Pod) bool {
_, podLimits := apiv1resource.PodRequestsAndLimits(pod)
_, found := podLimits[v1.ResourceEphemeralStorage]
if !found {
return false
}
// pod stats api summarizes ephemeral storage usage (container, emptyDir, host[etc-hosts, logs])
podEphemeralStorageTotalUsage := &resource.Quantity{}
// 若pod实际状态中 EphemeralStorage.UsedBytes 有数值,则和Limit值进行比较
if podStats.EphemeralStorage != nil && podStats.EphemeralStorage.UsedBytes != nil {
podEphemeralStorageTotalUsage = resource.NewQuantity(int64(*podStats.EphemeralStorage.UsedBytes), resource.BinarySI)
}
podEphemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage]
if podEphemeralStorageTotalUsage.Cmp(podEphemeralStorageLimit) > 0 {
// the total usage of pod exceeds the total size limit of containers, evict the pod
message := fmt.Sprintf(podEphemeralStorageMessageFmt, podEphemeralStorageLimit.String())
if m.evictPod(pod, 0, message, nil, nil) {
metrics.Evictions.WithLabelValues(signalEphemeralPodFsLimit).Inc()
return true
}
return false
}
return false
}
func (m *managerImpl) containerEphemeralStorageLimitEviction(podStats statsapi.PodStats, pod *v1.Pod) bool {
thresholdsMap := make(map[string]*resource.Quantity)
for _, container := range pod.Spec.Containers {
ephemeralLimit := container.Resources.Limits.StorageEphemeral()
if ephemeralLimit != nil && ephemeralLimit.Value() != 0 {
thresholdsMap[container.Name] = ephemeralLimit
}
}
for _, containerStat := range podStats.Containers {
containerUsed := diskUsage(containerStat.Logs)
if !*m.dedicatedImageFs {
containerUsed.Add(*diskUsage(containerStat.Rootfs))
}
// 比较该Pod下的每一个容器的实际临时存储已用值和Limit比较,有一个容器不满足则返回true
if ephemeralStorageThreshold, ok := thresholdsMap[containerStat.Name]; ok {
if ephemeralStorageThreshold.Cmp(*containerUsed) < 0 {
if m.evictPod(pod, 0, fmt.Sprintf(containerEphemeralStorageMessageFmt, containerStat.Name, ephemeralStorageThreshold.String()), nil, nil) {
metrics.Evictions.WithLabelValues(signalEphemeralContainerFsLimit).Inc()
return true
}
return false
}
}
}
return false
}
上面3个Limit检查的方法都调用了m.evictPod方法:
// 首先,检查pod是否是CriticalPod,CriticalPod包括static pod,mirror pod以及根据优先级来判定是否是CriticalPod
// static pod 的Annotations key:ConfigSourceAnnotationKey = "kubernetes.io/config.source" 对应的value是file,普通的pod对应的value是api
// mirror pod的Annotations key:kubernetes.io/config.mirror ,有此notation就是mirror pod
func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string, annotations map[string]string, condition *v1.PodCondition) bool {
// If the pod is marked as critical and static, and support for critical pod annotations is enabled,
// do not evict such pods. Static pods are not re-admitted after evictions.
// https://github.com/kubernetes/kubernetes/issues/40573 has more details.
if kubelettypes.IsCriticalPod(pod) {
klog.ErrorS(nil, "Eviction manager: cannot evict a critical pod", "pod", klog.KObj(pod))
return false
}
// record that we are evicting the pod
//若不是CriticalPod,进入evict流程
// 1. 通过client-go recoder event发送给K8S event记录
// 2. evict信息记录日志
// 3. 调用 m.killPodFunc evict the pod
m.recorder.AnnotatedEventf(pod, annotations, v1.EventTypeWarning, Reason, evictMsg)
// this is a blocking call and should only return when the pod and its containers are killed.
klog.V(3).InfoS("Evicting pod", "pod", klog.KObj(pod), "podUID", pod.UID, "message", evictMsg)
err := m.killPodFunc(pod, true, &gracePeriodOverride, func(status *v1.PodStatus) {
status.Phase = v1.PodFailed
status.Reason = Reason
status.Message = evictMsg
if condition != nil {
podutil.UpdatePodCondition(status, condition)
}
})
if err != nil {
klog.ErrorS(err, "Eviction manager: pod failed to evict", "pod", klog.KObj(pod))
} else {
klog.InfoS("Eviction manager: pod is evicted successfully", "pod", klog.KObj(pod))
}
return true
}
// killPodNow returns a KillPodFunc that can be used to kill a pod.
// It is intended to be injected into other modules that need to kill a pod.
// 这个方法的英文注释已经非常详尽了
func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error {
// determine the grace period to use when killing the pod
gracePeriod := int64(0)
if gracePeriodOverride != nil {
gracePeriod = *gracePeriodOverride
} else if pod.Spec.TerminationGracePeriodSeconds != nil {
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
// we timeout and return an error if we don't get a callback within a reasonable time.
// the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill)
timeout := int64(gracePeriod + (gracePeriod / 2))
minTimeout := int64(10)
if timeout < minTimeout {
timeout = minTimeout
}
timeoutDuration := time.Duration(timeout) * time.Second
// open a channel we block against until we get a result
ch := make(chan struct{}, 1)
podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
KillPodOptions: &KillPodOptions{
CompletedCh: ch,
Evict: isEvicted,
PodStatusFunc: statusFn,
PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
},
})
// wait for either a response, or a timeout
select {
case <-ch:
return nil
case <-time.After(timeoutDuration):
recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.")
return fmt.Errorf("timeout waiting to kill pod")
}
}
}
可见是调用的podWorkers.UpdatePod,实际的更新pod的方法,并给了相关的参数,实现了KillPodFunc,或者说evicting the pod。podWorkers.UpdatePod和其他podWorkers内容较多,令开一篇分析。
转载请注明来源,欢迎指出任何有错误或不够清晰的表达。可以邮件至 backendcloud@gmail.com