我是靠谱客的博主 精明小蝴蝶,最近开发中收集的这篇文章主要介绍Linux 时间系统分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

 

转载自:   https://www.binss.me/blog/linux-time-system-analysis/

 

新年新气象,掏出写了很长时间的文章和大家分享。

Linux kernel 发展至今,已经形成了一套强大的、复杂的时间系统,用来支撑上层建筑,其主要提供以下两项功能:

  • 提供当前的时间和日期
  • 维护定时器,在某个时间点到达时调用回调函数

没有良好的时钟系统,时序的判断就无从谈起,更别说进程的调度、切换,还有更上层的组件。在本文中,主要将通过 Linux kernel 的在初始化函数 start_kernel 中的函数调用顺序,来分析 Linux 中包含的各个时间系统模块。

我们先来看下 Linux 中的时间系统的大致层级结构:

 
 低精度定时器(timer)
 相互替代
 框架层 tick_device <-----> 高精度定时器(hrtimer) timekeeper
  
 抽象层 时钟事件设备(clock_event_device) 时钟源(clocksource)
  
 硬件层 硬件定时器(如 APIC timer) 时钟源(如 RTC TSC)

其中上层依赖于位于其下方的下层,上层设备基于其下层设备实现。我们从抽象层开始介绍。

clocksource

clocksource 提供了对不同软硬件时钟的抽象。可以理解为时间源,为 kernel 提供当前时间。

 
 struct clocksource {
 cycle_t (*read)(struct clocksource *cs); // 指向读取时钟的函数
 cycle_t mask; // 能够表示的 cycle 上限,通常是 32/64 位的全 f,做与操作可以避免对 overflow 进行专门处理
 u32 mult; // 将时间源的计数单位 (cycle_t) 转换为 ns
 u32 shift; // 换算公式为 (cycles * mult) >> shift
 u64 max_idle_ns; // 允许的最大空闲时间,单位 ns。当设置 CONFIG_NO_HZ 时,使用动态 tick,不限制 kernel 的睡眠时间,需要进行限制
 u32 maxadj; // 允许的最大调整值,避免转换时 overflow
 #ifdef CONFIG_ARCH_CLOCKSOURCE_DATA
 struct arch_clocksource_data archdata; // 架构专有(目前只有 x86 和 ia64)。
 #endif
 u64 max_cycles; // 设置了 cycle 上限,避免换算时溢出
 const char *name; // 时间源名称
 struct list_head list; // 注册了该时间源?
 int rating; // 优先级
 int (*enable)(struct clocksource *cs); // 启用时间源函数
 void (*disable)(struct clocksource *cs); // 停用时间源函数
 unsigned long flags;
 void (*suspend)(struct clocksource *cs); // 暂停时间源函数
 void (*resume)(struct clocksource *cs); // 恢复时间源函数
  
 /* private: */
 #ifdef CONFIG_CLOCKSOURCE_WATCHDOG // 用于监控时间源,校验时间是否准确
 /* Watchdog related data, used by the framework */
 struct list_head wd_list;
 cycle_t cs_last;
 cycle_t wd_last;
 #endif
 struct module *owner; // 指向拥有该时间源的内核模块
 };

其中 rating 表示了时间源的准确度,它将作为 Linux 选择时钟源时的优先级:

  • 1-99 - Only available for bootup and testing purposes;
  • 100-199 - Functional for real use, but not desired.
  • 200-299 - A correct and usable clocksource.
  • 300-399 - A reasonably fast and accurate clocksource.
  • 400-499 - The ideal clocksource. A must-use where available;

只有 rating 最高的时间源会被选用。

观察

查看支持的时间源

/sys/devices/system/clocksource/clocksource0/available_clocksource

查看当前时间源

/sys/devices/system/clocksource/clocksource0/current_clocksource

日志

 
 $ dmesg | grep clocksource
 [0.000000] clocksource: refined-jiffies: mask: 0xffffffff max_cycles: 0xffffffff, max_idle_ns: 7645519600211568 ns
 [0.000000] clocksource: hpet: mask: 0xffffffff max_cycles: 0xffffffff, max_idle_ns: 133484882848 ns
 [4.106948] clocksource: jiffies: mask: 0xffffffff max_cycles: 0xffffffff, max_idle_ns: 7645041785100000 ns
 [4.410608] clocksource: Switched to clocksource hpet
 [4.426144] clocksource: acpi_pm: mask: 0xffffff max_cycles: 0xffffff, max_idle_ns: 2085701024 ns
 [6.018708] tsc: Refined TSC clocksource calibration: 2197.447 MHz
 [6.018746] clocksource: tsc: mask: 0xffffffffffffffff max_cycles: 0x1facc727edd, max_idle_ns: 440795245373 ns
 [7.204127] clocksource: Switched to clocksource tsc

常见 clocksource

jiffies

英语中 jiffy 表示 a moment,即一瞬间。在 Linux 中作为软件维护的时钟。表示一小段短暂而不确定的时间。

属于低精度时间源,因为没有 CLOCK_SOURCE_VALID_FOR_HRES 的 flag,所以不会出现在 available_clocksource 中。

 
 static struct clocksource clocksource_jiffies = {
 .name = "jiffies",
 .rating = 1, /* lowest valid rating*/ // 优先级最低
 .read = jiffies_read, // 读时返回 jiffies
 .mask = CLOCKSOURCE_MASK(32),
 .mult = NSEC_PER_JIFFY << JIFFIES_SHIFT, /* details above */
 .shift = JIFFIES_SHIFT, // NSEC_PER_JIFFY 和 JIFFIES_SHIFT 由 CONFIG_HZ 决定
 .max_cycles = 10,
 };

Linux 用全局变量 jiffies / jiffies_64 来存放系统启动后经过的 jiffy 数目:

 
 extern u64 __jiffy_data jiffies_64; // x86_64 下使用,非原子,读时需要加锁,如 get_jiffies_64 用了 seqlock
 extern unsigned long volatile __jiffy_data jiffies; // x86_32 下使用

根据 arch/x86/kernel/vmlinux.lds.S,在 32 位下,jiffies 指向 jiffies_64 的低 32 位。

每当收到 clock_event_device (后详)发出的中断时,会调用其 handler ,即 tick_handle_periodic ,于是有

tick_handle_periodic => tick_periodic => do_timer => jiffies_64 += ticks

由于 do_timer 的参数为 1,因此 jiffies_64 += 1。而根据 tick_setup_device 中 tick_period = ktime_set(0, NSEC_PER_SEC / HZ) ,表示 tick_device 每个 tick 间隔为 1 / HZ 秒。于是每个 jiffies 代表的时间为 1/ HZ 秒,系统启动至今所经过的秒数 = jiffies_64 / HZ。HZ 由 CONFIG_HZ 决定,而 CONFIG_HZ 由 CONFIG_HZ_* 决定,有 100/250/300/1000 可选,一般为 1000。所以每隔 1 毫秒,jiffies "时钟" 就会加一,1 jiffy 等于 1 毫秒。

从 jiffies 我们可以发现,时钟源的不仅能够通过读取硬件时钟源来实现,还能通过 tick_device,更本质上来说是定时器来实现。

回绕(wrap around)

当 jiffies 值超过它的最大范围后就会发生溢出。在 32 位下为 unsigned long,最大取值为 (2^32)-1 即 429496795 。以 HZ 为 1000 为例,从 0 开始,最多经过 5 天就会达到上限。因此采取回绕措施,达到上限时继续增加就会回绕到 0。

time_after、time_before、time_after_eq、time_before_eq 等一些宏会对这种情况进行处理。

RTC(Real Time Clock)

主板上的晶振及相关电路组成的时钟,往往被集成到南桥芯片内,依靠主板上的电池供电。在 IRQ8 上周期性的产生中断,频率范围是 2Hz ~ 8192Hz。

通过 CMOS 进行读写,开机时读取,需要时写入。

精度太低,在 x86 下已被弃用(找不到定义)。

PIT(Programmalbe Interval Timer)

通过 8253/8254 时钟芯片实现,经过适当编程后,能够周期性地发出时钟中断。频率为 1193182 Hz (PIT_TICK_RATE)。

定义为:

 
 static struct clocksource i8253_cs = {
 .name = "pit",
 .rating = 110,
 .read = i8253_read,
 .mask = CLOCKSOURCE_MASK(32),
 };

由于精度问题,在有 HPET 的情况下不会使用 PIT。

refined_jiffies

在 register_refined_jiffies 中定义。基于 PIT,以 PIT_TICK_RATE (1193182 Hz)为频率,精度更高,但依然属于低精度时间源。

 
 int register_refined_jiffies(long cycles_per_second)
 {
 u64 nsec_per_tick, shift_hz;
 long cycles_per_tick;
  
 refined_jiffies = clocksource_jiffies;
 refined_jiffies.name = "refined-jiffies";
 refined_jiffies.rating++; // rating 为 2,比 clocksource_jiffies 高
  
 /* Calc cycles per tick */ // 和 clocksource_jiffies 不同,根据传入参数 (CLOCK_TICK_RATE) 来计算 shift_hz,然后算出 mult
 cycles_per_tick = (cycles_per_second + HZ/2)/HZ; // CLOCK_TICK_RATE = PIT_TICK_RATE
 /* shift_hz stores hz<<8 for extra accuracy */
 shift_hz = (u64)cycles_per_second << 8;
 shift_hz += cycles_per_tick/2;
 do_div(shift_hz, cycles_per_tick);
 /* Calculate nsec_per_tick using shift_hz */
 nsec_per_tick = (u64)NSEC_PER_SEC << 8;
 nsec_per_tick += (u32)shift_hz/2;
 do_div(nsec_per_tick, (u32)shift_hz);
  
 refined_jiffies.mult = ((u32)nsec_per_tick) << JIFFIES_SHIFT;
  
 __clocksource_register(&refined_jiffies); // 注册时间源
 return 0;
 }

acpi_pm(ACPI power management timer)

几乎所有 ACPI-based 的主板上都会有该设备。频率为 3.579545 MHz

 
 static struct clocksource clocksource_acpi_pm = {
 .name = "acpi_pm",
 .rating = 200,
 .read = acpi_pm_read,
 .mask = (cycle_t)ACPI_PM_MASK,
 .flags = CLOCK_SOURCE_IS_CONTINUOUS,
 };

通过 pmtmr_ioport 变量指向 Power Management Timer 寄存器,其结构如下:

 
 +-------------------------------+----------------------------------+
 | | |
 | upper eight bits of a | running count of the |
 | 32-bit power management timer | power management timer |
 | | |
 +-------------------------------+----------------------------------+
 31 E_TMR_VAL 24 TMR_VAL 0

通过调用 read_pmtmr 函数,会去读取 pmtmr_ioport,与上 24 位的 bitmask(ACPI_PM_MASK),获得其 TMR_VAL 部分。

HPET(High Precision Event Timer)

提供了更高的时钟频率 (10MHz+) 以及更宽的计数范围(64bit)。通过 APIC 发现,利用 MMIO 进行编程。集成到南桥中。

包含一个 64bit 的主计数器 (up-counter) 计数器,频率至少为 10MHz,一堆 (最多 256 个) 32 /64 bit 比较器(comparators)。当一个计数器(相关的位) 等于比较器 (最低有效位) 时,产生中断。

定义为:

 
 static struct clocksource clocksource_hpet = {
 .name ="hpet",
 .rating = 250,
 .read = read_hpet,
 .mask = CLOCKSOURCE_MASK(64),
 .flags = CLOCK_SOURCE_IS_CONTINUOUS,
 };

将 ACPI HPET table 的 hpet_address 映射到虚拟地址空间中,根据 Intel 手册大小为 1024 bytes,因此 HPET_MMAP_SIZE 为 1024。映射后统一通过 hpet_readl 读取该地址空间。

TSC(Time Stamp Counter)

Pentium 开始提供的 64 位寄存器。每次外部振荡器产生信号时 (每个 CPU 时钟周期) 加 1,因此频率依赖于 CPU 频率(Intel 手册上说相等),如果 CPU 频率为 400MHz 则每 2.5 ns 加 1。

为了使用 TSC,Linux 在系统初始化的时候必须通过调用 calibrate_tsc(native_calibrate_tsc) 来确定时钟的频率 (编译时无法确定,因为 kernel 可能运行在不同于编译机的其他 CPU 上)。一般用法是在一定时间内(需要通过其他时间源,如 hpet) 执行两次,记录 start 和 end 的时间戳,同时通过 rdtsc 读取 start 和 end 时 TSC counter,通过 (end - start time) / (end - start counter) 算出期间 CPU 实际频率。

但在多核时代下,由于不能保证同一块主板上每个核的同步,CPU 变频和指令乱序执行导致 TSC 几乎不可能取到准确的时间,但新式 CPU 中支持频率不变的 constant TSC。

定义为:

 
 static struct clocksource clocksource_tsc = {
 .name = "tsc",
 .rating = 300,
 .read = read_tsc,
 .mask = CLOCKSOURCE_MASK(64),
 .flags = CLOCK_SOURCE_IS_CONTINUOUS |
 CLOCK_SOURCE_MUST_VERIFY,
 .archdata = { .vclock_mode = VCLOCK_TSC },
 };

操作

注册时钟源

 
 static inline int __clocksource_register(struct clocksource *cs)
 {
 // 使用默认 mult 和 shift
 return __clocksource_register_scale(cs, 1, 0);
 }
  
 static inline int clocksource_register_hz(struct clocksource *cs, u32 hz)
 {
 return __clocksource_register_scale(cs, 1, hz);
 }
  
 static inline int clocksource_register_khz(struct clocksource *cs, u32 khz)
 {
 return __clocksource_register_scale(cs, 1000, khz);
 }
  
 int __clocksource_register_scale(struct clocksource *cs, u32 scale, u32 freq)
 {
  
 /* Initialize mult/shift and max_idle_ns */
 __clocksource_update_freq_scale(cs, scale, freq);
  
 /* Add clocksource to the clocksource list */
 mutex_lock(&clocksource_mutex); // 需要加锁,保护 curr_clocksource 和 clocksource_list
 clocksource_enqueue(cs);
 clocksource_enqueue_watchdog(cs); // 将 cs 加入到 wd_list,启动一个新的 watchdog timer
 clocksource_select(); // 设置 curr_clocksource 为当前 rating 最高的作为时间源
 clocksource_select_watchdog(false);
 mutex_unlock(&clocksource_mutex);
 return 0;
 }
  
 void __clocksource_update_freq_scale(struct clocksource *cs, u32 scale, u32 freq)
 {
 u64 sec;
  
 if (freq) {
 // 将 mask 除以 freq,得到能够表示的秒数上限
 sec = cs->mask;
 do_div(sec, freq);
 // 如果频率不是 hz,需要再除以倍数,即 mask / (scale * freq)
 do_div(sec, scale);
 if (!sec)
 sec = 1;
 else if (sec > 600 && cs->mask > UINT_MAX)
 sec = 600;
 // 根据上限计算不会溢出的 mult 和 shift
 // NSEC_PER_SEC=1GHz,计算将 freq 以 GHz 为单位时的 mult 和 shift
 // 由于上限又乘回 scale 将单位转换为原单位,因此目标频率要除以 scale,这样可以减少换算量?
 clocks_calc_mult_shift(&cs->mult, &cs->shift, freq,
 NSEC_PER_SEC / scale, sec * scale);
 }
  
 // 计算能够保证换算时不溢出的 max adjustment
 cs->maxadj = clocksource_max_adjustment(cs);
 while (freq && ((cs->mult + cs->maxadj <cs->mult)
 || (cs->mult - cs->maxadj > cs->mult))) {cs->mult >>= 1;
 cs->shift--;
 cs->maxadj = clocksource_max_adjustment(cs);
 }
  
 /*
 * Only warn for *special* clocksources that self-define
 * their mult/shift values and don't specify a freq.
 */
 WARN_ONCE(cs->mult + cs->maxadj <cs->mult,
 "timekeeping: Clocksource %s might overflow on 11%% adjustmentn",
 cs->name);
  
 // 计算 max_idle_ns(最大空闲时间) 和 max_cycles(cycle 上限)
 clocksource_update_max_deferment(cs);
  
 // 输出信息到 kernel buffer
 pr_info("%s: mask: 0x%llx max_cycles: 0x%llx, max_idle_ns: %lld nsn",
 cs->name, cs->mask, cs->max_cycles, cs->max_idle_ns);
 }

参数为要注册的时间源,频率的倍数(1khz 为 1000),频率。

注册到 sysfs

 
 static struct bus_type clocksource_subsys = {
 .name ="clocksource",
 .dev_name = "clocksource",
 };
  
 static struct device device_clocksource = {
 .id = 0,
 .bus = &clocksource_subsys,
 };
  
 static int __init init_clocksource_sysfs(void)
 {
 // 创建 /sys/devices/system/clocksource 目录
 int error = subsys_system_register(&clocksource_subsys, NULL);
 // 创建 /sys/devices/system/clocksource/clocksource0/ 目录
 if (!error)
 error = device_register(&device_clocksource);
 // 创建 clocksource0 下的 current_clocksource
 if (!error)
 error = device_create_file(
 &device_clocksource,
 &dev_attr_current_clocksource);
 // 创建 clocksource0 下的 unbind_clocksource(write only)
 if (!error)
 error = device_create_file(&device_clocksource,
 &dev_attr_unbind_clocksource);
 // 创建 clocksource0 下的 available_clocksource(read only)
 if (!error)
 error = device_create_file(
 &device_clocksource,
 &dev_attr_available_clocksource);
 return error;
 }

clock_event_device

时钟源 (clocksource) 只能用来查询时间,就好像一个手表一样,当你想查询时间时看一下,知道现在几点了。但如果你想设定一个闹钟,让它在特定的时间点提醒你,那么就需要时钟事件设备 (clock_event_device)。此类设备可以用来注册事件,让它们在未来的特定时间点被触发事件。

和 clocksource 一样,可能会存在多种 clock_event_device,OS 会根据它们的精度和能力,选择合适的 clock_event_device 来提供时钟事件服务。

 
 struct clock_event_device {
 void (*event_handler)(struct clock_event_device *); // 回调函数指针
 int (*set_next_event)(unsigned long evt, struct clock_event_device *); // 设置下一次时间触发时间的函数指针,参数类型为差值
 int (*set_next_ktime)(ktime_t expires, struct clock_event_device *); // 设置下一次时间触发时间的函数指针,参数类型为 ktime
 ktime_t next_event;
 u64 max_delta_ns; // 可设置的最大时间差
 u64 min_delta_ns; // 可设置的最小时间差
 u32 mult; // 用于 cycle 和 ns 的转换
 u32 shift;
 enum clock_event_state state_use_accessors;
 unsigned int features;
 unsigned long retries;
  
 int (*set_state_periodic)(struct clock_event_device *);
 int (*set_state_oneshot)(struct clock_event_device *);
 int (*set_state_oneshot_stopped)(struct clock_event_device *);
 int (*set_state_shutdown)(struct clock_event_device *);
 int (*tick_resume)(struct clock_event_device *);
  
 void (*broadcast)(const struct cpumask *mask);
 void (*suspend)(struct clock_event_device *);
 void (*resume)(struct clock_event_device *);
 unsigned long min_delta_ticks;
 unsigned long max_delta_ticks;
  
 const char *name;
 int rating; // 优先级
 int irq;
 int bound_on;
 const struct cpumask *cpumask;
 struct list_head list; // 用来加入到 clockevent_devices 链表
 struct module *owner;
 } ____cacheline_aligned;

其中 clock_event_state 维护了 clock_event_device 设备当前处于的状态, 定义如下:

 
 enum clock_event_state {
 CLOCK_EVT_STATE_DETACHED, // 设备未使用。一般作为初始状态来使用
 CLOCK_EVT_STATE_SHUTDOWN, // 设备已关闭
 CLOCK_EVT_STATE_PERIODIC, // 设备被编程为产生周期性事件
 CLOCK_EVT_STATE_ONESHOT, // 设备被编程为产生单次事件
 CLOCK_EVT_STATE_ONESHOT_STOPPED, // 设备被编程为产生单次事件,但目前暂时停止了
 };

而 features 为 clock_event_device 支持的特性:

 
 // 是否支持 oneshot/perioic 产生中断
 # define CLOCK_EVT_FEAT_PERIODIC 0x000001
 # define CLOCK_EVT_FEAT_ONESHOT 0x000002
 # define CLOCK_EVT_FEAT_KTIME 0x000004
  
 /*
 * x86(64) specific (mis)features:
 *
 * - Clockevent source stops in C3 State and needs broadcast support.
 * - Local APIC timer is used as a dummy device.
 */
 // 是否支持在 C3 下不产生中断
 # define CLOCK_EVT_FEAT_C3STOP 0x000008
 # define CLOCK_EVT_FEAT_DUMMY 0x000010
  
 /*
 * Core shall set the interrupt affinity dynamically in broadcast mode
 */
 # define CLOCK_EVT_FEAT_DYNIRQ 0x000020
 # define CLOCK_EVT_FEAT_PERCPU 0x000040
  
 /*
 * Clockevent device is based on a hrtimer for broadcast
 */
 # define CLOCK_EVT_FEAT_HRTIMER 0x000080

clock_event_device 类别

找来找去,只有 lapic_clockevent 比较熟悉。

lapic_clockevent

 
 static struct clock_event_device lapic_clockevent = {
 .name ="lapic",
 .features = CLOCK_EVT_FEAT_PERIODIC |
 CLOCK_EVT_FEAT_ONESHOT | CLOCK_EVT_FEAT_C3STOP
 | CLOCK_EVT_FEAT_DUMMY,
 .shift = 32,
 .set_state_shutdown = lapic_timer_shutdown,
 .set_state_periodic = lapic_timer_set_periodic,
 .set_state_oneshot = lapic_timer_set_oneshot,
 .set_next_event = lapic_next_event,
 .broadcast = lapic_timer_broadcast,
 .rating = 100,
 .irq = -1,
 };

注册

对于 APIC timer 来说,在

start_kernel => rest_init => kernel_thread(kernel_init) => kernel_init_freeable => smp_prepare_cpus(setup_max_cpus) => smp_ops.smp_prepare_cpus => native_smp_prepare_cpus => apic_bsp_setup => x86_init.timers.setup_percpu_clockev (setup_boot_APIC_clock) => setup_APIC_timer

时会调用 clockevents_register_device 注册 APIC timer。

 
 void clockevents_register_device(struct clock_event_device *dev)
 {
 unsigned long flags;
  
 /* Initialize state to DETACHED */
 // 设置设备状态
 clockevent_set_state(dev, CLOCK_EVT_STATE_DETACHED);
  
 // 如果未指配所属 CPU,设置为当前 CPU
 if (!dev->cpumask) {WARN_ON(num_possible_cpus() > 1);
 dev->cpumask = cpumask_of(smp_processor_id());
 }
 // 关闭抢占,加锁
 raw_spin_lock_irqsave(&clockevents_lock, flags);
 // 加入到 clockevent_devices 链表中
 list_add(&dev->list, &clockevent_devices);
 // 和 tick_device 当前的绑定的 clock_event_device 比较,如果新设备更优,则切换到新设备
 tick_check_new_device(dev);
 // 清除 clockevents_released 上的 clockevent 设备,转移到 clockevent_devices
 clockevents_notify_released();
  
 raw_spin_unlock_irqrestore(&clockevents_lock, flags);
 }

在注册 clock_event_device 会当前 tick_device 绑定的 clock_event_device 比较,如果它更优,则换绑。

tick_device

tick_device 和 clock_event_device 紧密相关。它是对 clock_event_device 进一步封装,用于代替时钟滴答中断,给内核提供 tick 事件。

在 include/linux/hrtimer.h 和 kernel/time/tick-internal.h 都有一下定义:

 
 DECLARE_PER_CPU(struct tick_device, tick_cpu_device);

每个 CPU 都定义了一个 tick_device 类型的 per-CPU 变量 tick_cpu_device :

tick_device 类型定义如下:

 
 struct tick_device {
 struct clock_event_device *evtdev;
 enum tick_device_mode mode;
 };

tick_device 需要和 clock_event_device 关联起来才生效。也就是说,tick_device 的 tick 依赖于 clock_event_device 发出的 event。在使用时,往往通过 per-CPU 变量找到当前 CPU 对应的 tick_device,然后通过其 evtdev 成员找到 clock_event_device 。

在注册 clock_event_device 时,会调用 tick_check_new_device ,检查 tick_device 是否应该更换 clock_event_device。

tick_check_new_device

和当前 tick_device 绑定的 clock_event_device 比较,如果新的 clock_event_device 更优,则切换到新设备,即更新 tick_device.evtdev :

 
 => per_cpu(tick_cpu_device, cpu) 获取当前 CPU 的 tick_device,通过其 evtdev 拿到对应的 clock_event_device
 => tick_check_percpu 如果当前 CPU 不在新设备的 bitmask 中 / 不能设置 irq affinity (非本地设备),不换
 => tick_check_preferred 如果新设备不支持 ONESHOT,而当前设备支持 / 已处于 ONESHOT 模式,不换
 否则,检查新设备 rating 是否大于当前设备,如果是,换
 => clockevents_shutdown 如果当前设备是广播设备,需要关掉,包括更新其状态为 CLOCK_EVT_STATE_SHUTDOWN 和将下次触发时间设为 KTIME_MAX
 => clockevents_exchange_device 否则将当前设备从 clockevent_devices 转移到 clockevents_released 中,更新状态为 CLOCK_EVT_STATE_DETACHED
 => clockevents_shutdown 对新设备也调用 shutdown
 => tick_setup_device 初始化 tick_device,设置当前 CPU 的 tick_device.evtdev 为新设备
 => tick_oneshot_notify 通知其他 CPU 时间源变了
 => tick_install_broadcast_device 如果不换 clock_event_device ,尝试将新设备设置为广播设备

一般来说,支持 oneshot 的 clock_event_device 比只支持 periodic 的精度更高。因此我们在选择 tick_device 对应的 clock_event_device 时更偏好于支持 oneshot 的设备。

在系统的启动阶段,tick_device 工作在周期触发模式的,直到在合适的时机,才会开启单触发模式,以便支持 NO_HZ 和 hrtimer 。于是 tick_setup_device 会检查 tick_device.evtdev 是否为空,如果是,表示当前 CPU 是第一次注册 tick_device,则需要将其设置为 TICKDEV_MODE_PERIODIC 模式(因为许多时间系统依赖于周期性的定时器中断,比如 jiffies),调用 tick_setup_periodic 初始化。如果不是,则根据 tick_device 能力进行设置,支持 oneshot 则设置为 TICKDEV_MODE_ONESHOT 模型,调用 tick_setup_oneshot 初始化。

应用

接下来我们来看看 Linux 是如何利用这两套系统来建立时钟系统的。

初始化

在 init/main.c 的内核启动函数 start_kernel 中,对时钟系统进行了初始化。

 
 start_kernel => setup_arch => x86_init.timers.wallclock_init() 初始化 wallclock
 => register_refined_jiffies(CLOCK_TICK_RATE) 初始化 refined_jiffies
 => tick_init 初始化 tick 系统
 => init_timers 初始化低精度定期器系统
 => hrtimers_init 初始化高精度定期器系统
 => timekeeping_init
 => time_init 设置 late_time_init 为 x86_late_time_init
 => x86_late_time_init => x86_init.timers.timer_init(hpet_time_init) 初始化 hpet,如果不支持,则初始化 pit
 => tsc_init 初始化 tsc,需要借助其他时间源来校准,因此放 hpet 后
 => rest_init APIC timer 在此初始化

x86_init 定义在 arch/x86/kernel/x86_init.c,用于指定平台特有的初始化函数指针:

 
 struct x86_init_ops x86_init __initdata = {
 ...
 .timers = {
 .setup_percpu_clockev = setup_boot_APIC_clock, // 为当前 CPU(boot CPU)初始化 APIC timer
 .timer_init = hpet_time_init, // 初始化 hpet
 .wallclock_init = x86_init_noop, // 在 x86 架构下没实现,但在 intel-mid 设备上实现了,指向 intel_mid_rtc_init
 },
 };

wallclock_init 在 x86 架构下没实现。register_refined_jiffies 前面提过,于是我们直接来看 tick_init。

tick_init

tick 系统初始化。

 
 => tick_broadcast_init
 => tick_nohz_init

tick_broadcast_init 初始化了 6 个 CPU 的 bitmask:

 
 void __init tick_broadcast_init(void)
 {
 zalloc_cpumask_var(&tick_broadcast_mask, GFP_NOWAIT); // 正在处于睡眠的 CPU
 zalloc_cpumask_var(&tick_broadcast_on, GFP_NOWAIT); // 处于周期性广播状态的 CPU
 zalloc_cpumask_var(&tmpmask, GFP_NOWAIT); // 临时变量
 #ifdef CONFIG_TICK_ONESHOT
 zalloc_cpumask_var(&tick_broadcast_oneshot_mask, GFP_NOWAIT); // 需要被通知的 CPU
 zalloc_cpumask_var(&tick_broadcast_pending_mask, GFP_NOWAIT); // 阻塞广播的 CPU
 zalloc_cpumask_var(&tick_broadcast_force_mask, GFP_NOWAIT); // 强制执行广播的 CPU
 #endif
 }

而 tick_nohz_init 初始化 tickless 模式:

 
 void __init tick_nohz_init(void)
 {
 int cpu;
  
 if (!tick_nohz_full_running) {
 // 分配 tick_nohz_full_mask,用于标识开启 full NO_HZ 的 CPU。这里设置为全 1,并设置 tick_nohz_full_running = true
 if (tick_nohz_init_all() < 0)
 return;
 }
  
 // 分配 housekeeping_mask ,用于标识不开启 NO_HZ 的 CPU(至少要有一个)
 if (!alloc_cpumask_var(&housekeeping_mask, GFP_KERNEL)) {
 WARN(1, "NO_HZ: Can't allocate not-full dynticks cpumaskn");
 cpumask_clear(tick_nohz_full_mask);
 tick_nohz_full_running = false;
 return;
 }
  
 /*
 * Full dynticks uses irq work to drive the tick rescheduling on safe
 * locking contexts. But then we need irq work to raise its own
 * interrupts to avoid circular dependency on the tick
 */
 // 查是否支持发送跨处理器中断,因为需要跨处理器唤醒
 if (!arch_irq_work_has_interrupt()) {
 pr_warn("NO_HZ: Can't run full dynticks because arch doesn't support irq work self-IPIsn");
 cpumask_clear(tick_nohz_full_mask);
 cpumask_copy(housekeeping_mask, cpu_possible_mask);
 tick_nohz_full_running = false;
 return;
 }
  
 // 获取当前 CPU 的 id,负责 housekeeping,即唤醒其他 CPU
 cpu = smp_processor_id();
  
 // 如果当前 CPU 在 tick_nohz_full_mask 中,去掉,因为它需要负责 housekeeping
 if (cpumask_test_cpu(cpu, tick_nohz_full_mask)) {
 pr_warn("NO_HZ: Clearing %d from nohz_full range for timekeepingn",
 cpu);
 cpumask_clear_cpu(cpu, tick_nohz_full_mask);
 }
  
 // 将所有不属于 tick_nohz_full_mask 的 CPU 设置到 housekeeping_mask 中
 cpumask_andnot(housekeeping_mask,
 cpu_possible_mask, tick_nohz_full_mask);
  
 // 为 tick_nohz_full_mask 中的 CPU 设置 context_tracking.active(per-cpu 变量)为 true,让 tracking subsystem 忽略之
 for_each_cpu(cpu, tick_nohz_full_mask)
 context_tracking_cpu_set(cpu);
  
 cpu_notifier(tick_nohz_cpu_down_callback, 0);
 pr_info("NO_HZ: Full dynticks CPUs: %*pbl.n",
 cpumask_pr_args(tick_nohz_full_mask));
  
 /*
 * We need at least one CPU to handle housekeeping work such
 * as timekeeping, unbound timers, workqueues, ...
 */
 // 如果 housekeeping_mask 为空,则没有 CPU 能够来负责 housekeeping,报警告
 WARN_ON_ONCE(cpumask_empty(housekeeping_mask));
 }

在完成了对 tick_device 的初始化后,基于 tick 的 jiffies 时钟能够正常工作,于是我们可以对依赖于 jiffies 的低精度定时器进行初始化。

init_timers

低精度定时器(timer)系统初始化。违反直觉的是,用来表示 timer 的结构是 timer_list :

 
 struct timer_list {
 /*
 * All fields that change during normal runtime grouped to the
 * same cacheline
 */
 struct hlist_node entry; // 用于加入到链表
 unsigned long expires; // 定时器超时时间
 void (*function)(unsigned long); // 超时时的回调函数
 unsigned long data; // 调用 function 时的参数
 u32 flags;
  
 #ifdef CONFIG_TIMER_STATS
 int start_pid;
 void *start_site;
 char start_comm[16];
 #endif
 #ifdef CONFIG_LOCKDEP
 struct lockdep_map lockdep_map;
 #endif
 };

在现代的 OS 中,可能同时激活了几百个甚至几千个 timer,为了对它们进行有效的管理,Linux 使用了 分 CPU + 类别 + 分层时间轮 的分组方案:

首先,在多核架构下同时存在多个运行的 CPU,为了避免打架,每个 CPU 各自维护自己的 timer 。于是定义了 per-cpu 变量 timer_bases 。

其次,timer_bases 是一个 timer_base 的数组,原因在于有不同的 timer 类别:如果支持 NO_HZ,会分为 BASE_STD 和 BASE_DEF 两个 timer_base 分别维护普通的 timer 和 NOHZ 的 timer。

最后,我们来看维护了 timer 们的 timer_base 结构:

 
 struct timer_base {
 spinlock_t lock;
 struct timer_list *running_timer; // 当前正在处理的 timer
 unsigned long clk; // 当前定时器所经过的 jiffies,用来判断 timer_list 是否超时
 unsigned long next_expiry; // 最早 (距离超时最近的 timer) 的超时时间
 unsigned int cpu; // 该 timer 所属 CPU
 bool migration_enabled; // 是否支持 timer_list 迁移到其他 CPU
 bool nohz_active; // 是否维护 NO_HZ timer_list
 bool is_idle;
 DECLARE_BITMAP(pending_map, WHEEL_SIZE); // 如果某个链表中有 timer,则对应的 bit 被置为 1
 struct hlist_head vectors[WHEEL_SIZE]; // 维护了所有链表
 } ____cacheline_aligned;

在 CONFIG_HZ 大于 100 的情况下,LVL_DEPTH 为 9,也就是一个 timer_base 维护了 9 个 level,每个 level 可维护 LVL_SIZE(2^6=64) 个链表,以 CONFIG_HZ 为 1000 时的情况为例:

 
 * HZ 1000 steps
 * Level Offset Granularity Range
 * 0 0 1 ms 0 ms - 63 ms
 * 1 64 8 ms 64 ms - 511 ms
 * 2 128 64 ms 512 ms - 4095 ms (512ms - ~4s)
 * 3 192 512 ms 4096 ms - 32767 ms (~4s - ~32s)
 * 4 256 4096 ms (~4s) 32768 ms - 262143 ms (~32s - ~4m)
 * 5 320 32768 ms (~32s) 262144 ms - 2097151 ms (~4m - ~34m)
 * 6 384 262144 ms (~4m) 2097152 ms - 16777215 ms (~34m - ~4h)
 * 7 448 2097152 ms (~34m) 16777216 ms - 134217727 ms (~4h - ~1d)
 * 8 512 16777216 ms (~4h) 134217728 ms - 1073741822 ms (~1d - ~12d)

看到这里,是不是会产生黑人问号了?这和教科书上说的时间轮实现好像不一样啊?

是的,在 2015 年,Thomas Gleixne 提交了一个 patch,参考 timer: Proof of concept wheel replacement。他尝试对 Linux 中的时间轮进行改版,此后其修改被合并进主干,成为了 4.X 后的时间轮实现。在此我们将它们区分为 经典时间轮 和 现代时间轮。

在经典时间轮中,分为 tv1 到 tv5 五个粒度和精度不同的轮子,tv1 的精度最高,能够表示的时间范围最短。每次只检查 tv1 中是否有 timer 超时,然后每隔 256 jiffies,tv1 中所有 timer 都将超时,于是需要从 tv2 中将 timer 挪到 tv1 中。同理,当 tv2 用完时,需要从 tv3 挪,以此类推,这种操作称为 cascade 。无疑,cascade 是非常昂贵的,同时要找到下一个超时的 timer 需要遍历整套轮子来查找,效率低。为此社区已经讨论多年,就是想取代这个家伙。

最终 Thomas Gleixne 站了出来,提出了现代时间轮,但要改变这个存在了近十年的模块,看得出他还是有些慌的:

Be aware that it might eat your disk, kill you cat and make your kids miss the bus, but with some care one it should be able to tame the beast and get some real testing done

在现代时间轮中,如上所示,在 HZ 为 1000 下分为了 9 个轮子。每一层的粒度都是上一层的 8(2^LVL_CLK_SHIFT)倍,能够表示的范围也是上一层的 8 倍。也就是说,在 level 0 中,64 个链表分别对应 0-63ms,每毫秒(ms) 都维护了一个链表。而对于其他 level,比如 level 1 来说,每 8ms 才对应一个链表,在这 8ms 内的超时 timer 都会被加到这个链表中。因此一个 timer_base 维护了 WHEEL_SIZE = LVL_SIZE * LVL_DEPTH 个链表,在我们的情况下,有 9 * 64 = 576 个链表,覆盖长度为 12 天。

相比经典时间轮,最本质的改变是消除了 cascade 。当然这不是没有代价的:经典时间轮总是检查 tv1,那些最先超时的 timer 总是通过 cascade 被维护在 tv1 中,因此在它们超时时总是能立刻检查到,调用它们的回调函数。然而在新型时间轮中,timer 自被注册后就固定不动了,该在哪一 level 就在哪一 level,然后以不同的时间间隔去检查各 level。比如 level 0 是每个 jiffies 检查一次,而 level 1 要每 8 ms(粒度)才检查一次。也就是说,如果有一个 timer 是 68 jiffies 超时的,那么需要等到 64 + 8 = 72 jiffies 时才会被检查到,相当于晚了 4 jiffies 才超时。更严重的是,level 越高,粒度越大,检查出超时和实际超时时间的差就可能越大。

但我们认为没关系,这是基于以下几个观察:

  • 几乎所有的 timer 都被设置为在不久的将来超时,都集中在低 level
  • 几乎所有的 timer 都会在超时前被取消,如果我们的目的是超时触发,那么考虑使用高精度定时器 hrtimer
  • timer 的超时意味着出了什么问题,而问题的告知不需要太准时

下面我们来看新型时间轮的具体实现:

init_timers 中,我们需要对这些结构进行初始化:

 
 => init_timer_cpus => init_timer_cpu 初始化每个 CPU 的所有的 timer_base
 => init_timer_stats => raw_spin_lock_init => __raw_spin_lock_init 初始化每个 CPU 的 tstats_lookup_lock,用于保护 procfs 的 timer 统计操作
 => open_softirq(TIMER_SOFTIRQ, run_timer_softirq) 注册 timer 软中断的处理函数为 run_timer_softirq

open_softirq 为 TIMER_SOFTIRQ 中断注册了处理函数 run_timer_softirq 。因此在收到时钟中断时,调用 run_timer_softirq 。

run_timer_softirq

处理时间中断,即调用超时 timer 的回调函数。

 
 /*
 * This function runs timers and the timer-tq in bottom half context.
 */
 static __latent_entropy void run_timer_softirq(struct softirq_action *h)
 {
 struct timer_base *base = this_cpu_ptr(&timer_bases[BASE_STD]);
  
 /*
 * must_forward_clk must be cleared before running timers so that any
 * timer functions that call mod_timer will not try to forward the
 * base. idle trcking / clock forwarding logic is only used with
 * BASE_STD timers.
 *
 * The deferrable base does not do idle tracking at all, so we do
 * not forward it. This can result in very large variations in
 * granularity for deferrable timers, but they can be deferred for
 * long periods due to idle.
 */
 base->must_forward_clk = false;
  
 __run_timers(base);
 if (IS_ENABLED(CONFIG_NO_HZ_COMMON))
 __run_timers(this_cpu_ptr(&timer_bases[BASE_DEF]));
 }

对普通的 timer_base 和 NO_HZ 的 timer_base 分别执行 __run_timers :

 
 static inline void __run_timers(struct timer_base *base)
 {
 // 用来存放各 level 中超时 timer,每层一个链表
 struct hlist_head heads[LVL_DEPTH];
 int levels;
  
 // 如果当前 jiffies 小于 timer_base 设置的 jiffies ,此时不可能有超时的 timer,返回
 if (!time_after_eq(jiffies, base->clk))
 return;
  
 spin_lock_irq(&base->lock);
  
 // 循环至 timer_base 设置的 jiffies 大于当前 jiffies 为止
 while (time_after_eq(jiffies, base->clk)) {
 // 如果当前 jiffies 大于 timer_base 中设置的 jiffies,则 timer_base 维护的 timer 中可能会有到期的
 // 在各 level 中查找 base->clk 时刻时超时的 timer,将它们添加到 heads 的对应链表中。返回超时的最高 level
 levels = collect_expired_timers(base, heads);
 // 增加 timer_base 设置的 jiffies ,这样可能就不会进入下一轮循环
 base->clk++;
  
 // 遍历 heads 中的链表,将里面的 timer 从链表中移除,并调用 timer_list 中设置的超时回调函数
 while (levels--)
 expire_timers(base, heads + levels);
 }
 base->running_timer = NULL;
 spin_unlock_irq(&base->lock);
 }

collect_expired_timers

实际是对 __collect_expired_timers 的包装,只是对 NO_HZ 情况进行了优化:

 
 static int collect_expired_timers(struct timer_base *base,
 struct hlist_head *heads)
 {
 /*
 * NOHZ optimization. After a long idle sleep we need to forward the
 * base to current jiffies. Avoid a loop by searching the bitfield for
 * the next expiring timer.
 */
 // 如果当前 jiffies 比 timer_base 中设置的 jiffies 超前了不止 1,表示之前 CPU 进入了 NO_HZ 模式
 // 与其一个一个 jiffies (base->clk++) 去找,不如获取下一个 timer 超时时的 jiffies,然后我们直接跳到该 jiffies 去找
 if ((long)(jiffies - base->clk) > 2) {
 unsigned long next = __next_timer_interrupt(base);
  
 /*
 * If the next timer is ahead of time forward to current
 * jiffies, otherwise forward to the next expiry time:
 */
 // 如果当前还没到达下一个 timer 的超时时间,将 timer_base 的 jiffies 设置到超时 jiffies - 1 后返回
 // 这样可以破坏 __run_timers 中的循环条件,避免做无用功
 if (time_after(next, jiffies)) {
 /* The call site will increment clock! */
 base->clk = jiffies - 1;
 return 0;
 }
 base->clk = next;
 }
 return __collect_expired_timers(base, heads);
 }

__collect_expired_timers

在 timer_base 的各 level 中查找匹配的链表:

 
 static int __collect_expired_timers(struct timer_base *base,
 struct hlist_head *heads)
 {
 unsigned long clk = base->clk;
 struct hlist_head *vec;
 int i, levels = 0;
 unsigned int idx;
  
 // 从 level 0 开始找(最容易超时)
 for (i = 0; i < LVL_DEPTH; i++) {
 // 计算 timer_base 中设置的 jiffies 时刻所对应的链表索引
 idx = (clk & LVL_MASK) + i * LVL_SIZE;
 // 根据 bitmap 判断链表中有 timer ,如果有,清除该 bit,因为链表中的所有 timer 都会被取出并处理(调用回调)
 if (__test_and_clear_bit(idx, base->pending_map)) {
 vec = base->vectors + idx;
 // 将该链表添加到 heads 中
 hlist_move_list(vec, heads++);
 // 更新发生超时的最高 level 到 levels 中
 levels++;
 }
 /* Is it time to look at the next level? */
 // 如果 clk 低 3 位不为 0 (下一层是上一层粒度的 8 倍),说明还未到检查下一层的时机,返回
 if (clk & LVL_CLK_MASK)
 break;
 /* Shift clock for the next level granularity */
 // 检查下一层(更大粒度)
 // timer_base 的 jiffies 右移 3 位,因为下一层时间粒度是上一层的 8(2^LVL_CLK_SHIFT) 倍
 clk >>= LVL_CLK_SHIFT;
 }
 return levels;
 }

从以上的实现我们可以发现,新型时间轮是 轮不动,游标 (timer_base.clk) 动 。每次游标移动时,计算它在当前时刻(jiffies)时所指向的轮位置,然后利用 bitmap 对链表中是否有超时 timer 实现了快速判断。同时只有在相应时刻(粒度对应的 bit 全 0)才会检查下一层,避免了遍历所有轮子。

使用

定义

timer 有两种定义方式,一种是通过 DEFINE_TIMER 宏传入名称、回调函数、超时时间、回调参数来定义,另一种是声明 timer_list 结构变量后调用 init_timer 来初始化,并手动设置各字段的值。

激活 / 修改

add_timer 负责将 timer 激活,即将 timer_list 添加到当前 CPU 的 timer_base 中。

激活实际上是复用了修改 timer 的函数 mod_timer,只是参数 expires 就是 timer 的 expires 而已。于是有

mod_timer => __mod_timer :

 
 static inline int
 __mod_timer(struct timer_list *timer, unsigned long expires, bool pending_only)
 {
 struct timer_base *base, *new_base;
 unsigned int idx = UINT_MAX;
 unsigned long clk = 0, flags;
 int ret = 0;
  
 BUG_ON(!timer->function);
  
 // 如果 timer 已被添加到 timer_base 中,则处于 pending 状态
 if (timer_pending(timer)) {
 // 如果 timer->expires 等于 expires,表示是新增,但其已位于 timer_base ,因此返回
 if (timer->expires == expires)
 return 1;
 // 接下来为修改已添加到 timer_base 的 timer
 base = lock_timer_base(timer, &flags);
 clk = base->clk;
 // 计算修改超时时间后 timer 属于第几个链表
 idx = calc_wheel_index(expires, clk);
  
 // 如果修改后还是和原来位于同一个链表,则更新 timer 的超时时间后返回
 if (idx == timer_get_idx(timer)) {
 timer->expires = expires;
 ret = 1;
 goto out_unlock;
 }
 } else {
 base = lock_timer_base(timer, &flags);
 }
  
 timer_stats_timer_set_start_info(timer);
  
 // 将 timer 从对应的链表中移除,如果 timer 是该链表中唯一的节点,清掉 bitmap 中相应的 bit
 ret = detach_if_pending(timer, base, false);
 if (!ret && pending_only)
 goto out_unlock;
  
 debug_activate(timer, expires);
  
 // 获取当前 CPU 的 timer_base
 new_base = get_target_base(base, timer->flags);
 // 如果 timer 的 timer_base 和当前 CPU 的 timer_base 不一致,则 timer 需要迁移到当前 CPU
 // 于是设置 timer flags 为 TIMER_MIGRATING,表示从别的 CPU 上迁移过来的
 if (base != new_base) {
 if (likely(base->running_timer != timer)) {
 /* See the comment in lock_timer_base() */
 timer->flags |= TIMER_MIGRATING;
  
 spin_unlock(&base->lock);
 base = new_base;
 spin_lock(&base->lock);
 WRITE_ONCE(timer->flags,
 (timer->flags & ~TIMER_BASEMASK) | base->cpu);
 forward_timer_base(base);
 }
 }
  
 /* Try to forward a stale timer base clock */
 // 尝试把 timer_base 的 jiffies 往后挪,最好是挪到下一次超时的时刻
 forward_timer_base(base);
 timer->expires = expires;
  
 // 如果 timer_base 的 jiffies 没有发生改变
 if (idx != UINT_MAX && clk == base->clk) {
 // 将 timer 加入到相应的链表中
 enqueue_timer(base, timer, idx);
 // 更新下一次超时时间
 trigger_dyntick_cpu(base, timer);
 } else {
 // 否则需要重新计算 index,需要把前面的工作重新做一遍
 internal_add_timer(base, timer);
 }
  
 out_unlock:
 spin_unlock_irqrestore(&base->lock, flags);
  
 return ret;
 }

移除

将 timer 从 timer_base 中移除。

 
 int del_timer(struct timer_list *timer)
 {
 struct timer_base *base;
 unsigned long flags;
 int ret = 0;
  
 debug_assert_init(timer);
  
 timer_stats_timer_clear_start_info(timer);
 // 如果 timer 位于 timer_base ,则需要移除之
 if (timer_pending(timer)) {
 base = lock_timer_base(timer, &flags);
 // 将 timer 从对应的链表中移除,如果 timer 是该链表中唯一的节点,清掉 bitmap 中相应的 bit
 ret = detach_if_pending(timer, base, true);
 spin_unlock_irqrestore(&base->lock, flags);
 }
  
 return ret;
 }

TIMER_SOFTIRQ 触发

通过前面的分析,我们知道在收到 TIMER_SOFTIRQ 时,会调用中断处理函数 run_timer_softirq 来找出超时的 timer,然后调用它们的回调函数。

那么 TIMER_SOFTIRQ 是由何人发出的呢?

在 tick 设备初始化过程中,有 tick_setup_periodic => tick_set_periodic_handler ,会设置收到 clock_event_device 中断时调用的 handler 为 tick_handle_periodic 。

于是 tick_handle_periodic => tick_periodic => update_process_times => run_local_timers => raise_softirq(TIMER_SOFTIRQ)

因此每当收到 tick 时,会发送软中断 TIMER_SOFTIRQ,让中断处理函数 run_timer_softirq 去处理。

hrtimers_init

高精度定时器初始化。高精度定时器(hrtimer) 旨在解决低精度定时器(timer) 存在的以下问题:

  • 精度太低。低精度定时器基于 jiffies,以 jiffies 为计数单位,而 jiffies 的精度只有 1 / CONFIG_HZ,前文提到,一般为 1000,因此低精度定时器的精度只有 1 毫秒。
  • 虽然在大多数情况下操作分层时间轮的开销为常数级,但在发生进位时需要做 cascade,开销为 O(n)。当然前文提到在新实现中以精度换取了开销的减少
  • 低精度定时器在设计时希望在超时到来之前获得正确的结果,然后删除之,精确定时不是其设计目的

为此,Linux 重新设计了一套 高精度定时器,可以提供纳秒级的定时精度。其定义如下:

 
 struct hrtimer {
 struct timerqueue_node node; // 红黑树节点的封装
 ktime_t _softexpires; // 最早超时时间
 enum hrtimer_restart (*function)(struct hrtimer *); // 超时的回调函数
 struct hrtimer_clock_base *base; // 指向所属的 hrtimer_clock_base
 u8 state; // 当前的状态,只有 HRTIMER_STATE_INACTIVE 和 HRTIMER_STATE_ENQUEUED 两种
 u8 is_rel; // 是否是 relative
 #ifdef CONFIG_TIMER_STATS
 int start_pid;
 void *start_site;
 char start_comm[16];
 #endif
 };
  
 struct timerqueue_node {
 struct rb_node node; // 在红黑树中对应的节点
 ktime_t expires; // 硬超时时间,等于 _softexpires + slack
 };

可以发现有两个超时时间: _softexpires 用来维护最早超时时间,而 expires 用来维护最晚超时时间,因此 hrtimer 可能在 [_softexpires, expires] 之间的任意时刻超时。有了这个范围,定时器系统可以让范围接近或重叠的多个定时器在同一时刻同时到期,避免进程频繁地被 hrtimer 进行唤醒。

类似于 timer 需要被注册到 timer_base 上,hrtimer 也需要被注册到 hrtimer_clock_base 上。但不同于 timer_base 采用时间轮,hrtimer_clock_back 采用了红黑树来维护 timer,树的最左边的节点就是最快超时的 hrtimer。

 
 struct hrtimer_clock_base {
 struct hrtimer_cpu_base *cpu_base; // 指向所属的 hrtimer_cpu_base
 int index; // 类型 index
 clockid_t clockid;
 struct timerqueue_head active; // 红黑树节点的封装
 ktime_t (*get_time)(void); // 获取当前时间的函数指针
 ktime_t offset;
 } __attribute__((__aligned__(HRTIMER_CLOCK_BASE_ALIGN)));
  
 struct timerqueue_head {
 struct rb_root head; // 红黑树根节点
 struct timerqueue_node *next; // 指向树上最早超时的节点,即最左边的节点
 };

每个 CPU 维护了一个 hrtimer_cpu_base 变量,其维护了其拥有的 hrtimer_clock_base 数组。hrtimer_clock_base 数组有 4 项,分别为:

 
 enum hrtimer_base_type {
 HRTIMER_BASE_MONOTONIC, // 单调递增的monotonic时间,不包含休眠时间
 HRTIMER_BASE_REALTIME, // 墙上真实时间
 HRTIMER_BASE_BOOTTIME, // 单调递增的boottime,包含休眠时间
 HRTIMER_BASE_TAI, // Temps Atomique International,CLOCK_TAI = CLOCK_REALTIME(UTC) + tai_offset
 HRTIMER_MAX_CLOCK_BASES,
 };

如果这几种 hrtimer_clock_base 都被创建了,则它们最先超时的时间可能不同,此时将它们中最先超时的时间存到 hrtimer_cpu_base.expires_next 中。

于是有调用:hrtimers_init => hrtimers_prepare_cpu(smp_processor_id())

 
 int hrtimers_prepare_cpu(unsigned int cpu)
 {
 // 取出 CPU 对应的 hrtimer_cpu_base
 struct hrtimer_cpu_base *cpu_base = &per_cpu(hrtimer_bases, cpu);
 int i;
  
 // 初始化各种 hrtimer_clock_base
 for (i = 0; i < HRTIMER_MAX_CLOCK_BASES; i++) {
 cpu_base->clock_base[i].cpu_base = cpu_base;
 timerqueue_init_head(&cpu_base->clock_base[i].active);
 }
  
 cpu_base->cpu = cpu;
 // 初始化超时时间为 KTIME_MAX,active hrtimer_cpu_base 数为 0
 hrtimer_init_hres(cpu_base);
 return 0;
 }

创建

定义一个 hrtimer 变量,然后调用 hrtimer_init 进行初始化。

 
 hrtimer_init => __hrtimer_init => hrtimer_clockid_to_base 根据 clock_id 找到 base
 => timer->base = &cpu_base->clock_base[base] 填充到当前 CPU 的 clock_base 数组中
 => timerqueue_init 初始化自己的红黑树节点

激活 / 修改

有两种方式可以激活 hrtimer: hrtimer_start 和 hrtimer_start_range_ns:前者指定的是超时的时间点,而后者可以指定超时的时间范围。实际上 hrtimer_start 正是通过调用 hrtimer_start_range_ns 来实现,只是参数 delta_ns 也就是 slack 值为 0。

 
 void hrtimer_start_range_ns(struct hrtimer *timer, ktime_t tim,
 u64 delta_ns, const enum hrtimer_mode mode)
 {
 struct hrtimer_clock_base *base, *new_base;
 unsigned long flags;
 int leftmost;
  
 base = lock_hrtimer_base(timer, &flags);
  
 /* Remove an active timer from the queue: */
 // 如果该 hrtimer 已经在树上,先将它移除
 remove_hrtimer(timer, base, true);
  
 // 如果是相对时间,则加上 hrtimer_clock_base 的当前时间成为绝对时间
 if (mode & HRTIMER_MODE_REL)
 tim = ktime_add_safe(tim, base->get_time());
  
 // 如果开启了 CONFIG_TIME_LOW_RES,则需要进入低精度模式,将 hrtimer 粒度设置为 hrtimer_resolution
 tim = hrtimer_update_lowres(timer, tim, mode);
  
 // 设置软超时时间和硬超时时间
 hrtimer_set_expires_range_ns(timer, tim, delta_ns);
  
 /* Switch the timer base, if necessary: */
 // 如果 hrtimer 的 base 和当前 CPU 不一致,需要迁移到当前 CPU 的 base
 new_base = switch_hrtimer_base(timer, base, mode & HRTIMER_MODE_PINNED);
  
 timer_stats_hrtimer_set_start_info(timer);
  
 // 将 hrtimer 加入到黑红树中,如果当前节点是最早超时的,则返回 true
 leftmost = enqueue_hrtimer(timer, new_base);
 if (!leftmost)
 goto unlock;
  
 // 如果该 hrtimer 是最早超时的
 if (!hrtimer_is_hres_active(timer)) {
 /*
 * Kick to reschedule the next tick to handle the new timer
 * on dynticks target.
 */
 if (new_base->cpu_base->nohz_active)
 wake_up_nohz_cpu(new_base->cpu_base->cpu);
 } else {
 // 处于高精度模式,调用 tick_program_event 重新编程
 hrtimer_reprogram(timer, new_base);
 }
 unlock:
 unlock_hrtimer_base(timer, &flags);
 }

tick_program_event

 
 int tick_program_event(ktime_t expires, int force)
 {
 struct clock_event_device *dev = __this_cpu_read(tick_cpu_device.evtdev);
  
 if (unlikely(expires.tv64 == KTIME_MAX)) {
 /*
 * We don't need the clock event device any more, stop it.
 */
 clockevents_switch_state(dev, CLOCK_EVT_STATE_ONESHOT_STOPPED);
 return 0;
 }
  
 if (unlikely(clockevent_state_oneshot_stopped(dev))) {
 /*
 * We need the clock event again, configure it in ONESHOT mode
 * before using it.
 */
 clockevents_switch_state(dev, CLOCK_EVT_STATE_ONESHOT);
 }
  
 return clockevents_program_event(dev, expires, force);
 }

找到当前 CPU 的 clock_event_device,将其切换到 oneshot 模式,然后设定超时时间为 hrtimer 超时时间。

移除

remove_hrtimer => __remove_hrtimer

 
 static void __remove_hrtimer(struct hrtimer *timer,
 struct hrtimer_clock_base *base,
 u8 newstate, int reprogram)
 {
 struct hrtimer_cpu_base *cpu_base = base->cpu_base;
 u8 state = timer->state;
  
 timer->state = newstate;
 if (!(state & HRTIMER_STATE_ENQUEUED))
 return;
  
 // 从红黑树上移除,返回最早超时 hrtimer 指针,如果返回 NULL,表示树上没有 hrtimer 了,需要更新该 base 状态为非 active
 if (!timerqueue_del(&base->active, &timer->node))
 cpu_base->active_bases &= ~(1 << base->index);
  
 #ifdef CONFIG_HIGH_RES_TIMERS
 /*
 * Note: If reprogram is false we do not update
 * cpu_base->next_timer. This happens when we remove the first
 * timer on a remote cpu. No harm as we never dereference
 * cpu_base->next_timer. So the worst thing what can happen is
 * an superflous call to hrtimer_force_reprogram() on the
 * remote cpu later on if the same timer gets enqueued again.
 */
 // 如果开启了高精度模式且 reprogram 为 1,则重新设置 clock_event_device 的触发时间
 if (reprogram && timer == cpu_base->next_timer)
 hrtimer_force_reprogram(cpu_base, 1);
 #endif
 }

超时触发

高精度定时器检查是否有超时的 hrtimer 的时机取决于是否进入高精度模式。

在进入高精度模式前,每当收到 tick 时,有调用链 tick_periodic => update_process_times => run_local_timers => hrtimer_run_queues

 
 void hrtimer_run_queues(void)
 {
 struct hrtimer_cpu_base *cpu_base = this_cpu_ptr(&hrtimer_bases);
 ktime_t now;
 // 当前处于高精度模式,直接返回
 if (__hrtimer_hres_active(cpu_base))
 return;
  
 /*
 * This _is_ ugly: We have to check periodically, whether we
 * can switch to highres and / or nohz mode. The clocksource
 * switch happens with xtime_lock held. Notification from
 * there only sets the check bit in the tick_oneshot code,
 * otherwise we might deadlock vs. xtime_lock.
 */
 // 如果支持高精度,则切换到高精度模式,否则尝试切换到 nohz 模式
 if (tick_check_oneshot_change(!hrtimer_is_hres_enabled())) {
 hrtimer_switch_to_hres();
 return;
 }
  
 raw_spin_lock(&cpu_base->lock);
 now = hrtimer_update_base(cpu_base);
 // 遍历 hrtimer_cpu_base 中的各个 base,不断取出它们最早超时的节点(hrtimer), 如果它们相对现在已经超时,调用 __run_hrtimer
 // __run_hrtimer 会将其从红黑树上移除,并调用回调函数
 __hrtimer_run_queues(cpu_base, now);
 raw_spin_unlock(&cpu_base->lock);
 }

可以看到在进入高精度模式前,处理 hrtimer 的精度为每 tick 一次,因为 jiffies 在每次 tick 时也会加一,也就是说 hrtimer 几乎沦为了 timer。但它会不断尝试进入高精度模式。如果可以能进入高精度模式,通过调用 hrtimer_switch_to_hres => tick_init_highres => tick_switch_to_oneshot(hrtimer_interrupt) 进行切换。它会将 CPU clock_event_device 的回调函数设置为 hrtimer_interrupt ,并将该设备切换到 oneshot mode。一旦 base 进入高精度模式,此后 hrtimer_run_queues 会直接返回。

此后在收到 clock_event_device 发来的中断后,调用 hrtimer_interrupt 对超时 hrtimer 进行处理:

 
 void hrtimer_interrupt(struct clock_event_device *dev)
 {
 struct hrtimer_cpu_base *cpu_base = this_cpu_ptr(&hrtimer_bases);
 ktime_t expires_next, now, entry_time, delta;
 int retries = 0;
  
 BUG_ON(!cpu_base->hres_active);
 cpu_base->nr_events++;
 dev->next_event.tv64 = KTIME_MAX;
  
 raw_spin_lock(&cpu_base->lock);
 // 记录进入循环时的时间
 entry_time = now = hrtimer_update_base(cpu_base);
 retry:
 cpu_base->in_hrtirq = 1;
 /*
 * We set expires_next to KTIME_MAX here with cpu_base->lock
 * held to prevent that a timer is enqueued in our queue via
 * the migration code. This does not affect enqueueing of
 * timers which run their callback and need to be requeued on
 * this CPU.
 */
 cpu_base->expires_next.tv64 = KTIME_MAX;
 // 遍历 hrtimer_cpu_base 中的各个 base,不断取出它们最早超时的节点(hrtimer), 如果它们相对现在已经超时,调用 __run_hrtimer
 // __run_hrtimer 会将其从红黑树上移除,并调用回调函数
 __hrtimer_run_queues(cpu_base, now);
  
 /* Reevaluate the clock bases for the next expiry */
 // 遍历 hrtimer_cpu_base 中的各个 base,得到下次最早的超时时间
 expires_next = __hrtimer_get_next_event(cpu_base);
 /*
 * Store the new expiry value so the migration code can verify
 * against it.
 */
 cpu_base->expires_next = expires_next;
 cpu_base->in_hrtirq = 0;
 raw_spin_unlock(&cpu_base->lock);
  
 /* Reprogramming necessary ? */
 // 将新的超时时间设置到 clock_event_device
 if (!tick_program_event(expires_next, 0)) {
 cpu_base->hang_detected = 0;
 return;
 }
 // 如果 tick_program_event 返回非 0,表示 expires_next 已经过期,可能原因如下:
  
 /*
 * The next timer was already expired due to:
 * - tracing
 * - long lasting callbacks
 * - being scheduled away when running in a VM
 *
 * We need to prevent that we loop forever in the hrtimer
 * interrupt routine. We give it 3 attempts to avoid
 * overreacting on some spurious event.
 *
 * Acquire base lock for updating the offsets and retrieving
 * the current time.
 */
 // 为了解决这个问题,我们提供 3 次机会,重新执行前面的循环,处理到期的 hrtimer
 raw_spin_lock(&cpu_base->lock);
 now = hrtimer_update_base(cpu_base);
 cpu_base->nr_retries++;
 if (++retries < 3)
 goto retry;
 /*
 * Give the system a chance to do something else than looping
 * here. We stored the entry time, so we know exactly how long
 * we spent here. We schedule the next event this amount of
 * time away.
 */
 cpu_base->nr_hangs++;
 cpu_base->hang_detected = 1;
 raw_spin_unlock(&cpu_base->lock);
 delta = ktime_sub(now, entry_time);
 // 如果 3 次尝试后依然失败,则计算 3 次循环的总时间,直接将下次超时的时间推后,最多 100 ms,然后重新通过 tick_program_event 设置
 if ((unsigned int)delta.tv64 > cpu_base->max_hang_time)
 cpu_base->max_hang_time = (unsigned int) delta.tv64;
 /*
 * Limit it to a sensible value as we enforce a longer
 * delay. Give the CPU at least 100ms to catch up.
 */
 if (delta.tv64 > 100 * NSEC_PER_MSEC)
 expires_next = ktime_add_ns(now, 100 * NSEC_PER_MSEC);
 else
 expires_next = ktime_add(now, delta);
 tick_program_event(expires_next, 1);
 printk_once(KERN_WARNING "hrtimer: interrupt took %llu nsn",
 ktime_to_ns(delta));
 }

模拟 tick

前面提到,我们通过 tick_switch_to_oneshot(hrtimer_interrupt) 切换到高精度模式。它会将 CPU clock_event_device 的回调函数设置为 hrtimer_interrupt 。那么原来的回调函数,也就是 tick_handle_periodic 被替换掉了。如此一来,我们就不会再处理 tick ,于是依赖于 jiffies 或者更准确地说是 tick 的低精度定时器 timer 将得不到处理。我们需要对这种情况进行处理,Linux 采用的方法是 tick 模拟:

通过定义一个 hrtimer,把它的超时时间设定为一个 tick ,当这个 hrtimer 到期时,在这个 hrtimer 的回调函数中,调用 tick 的回调函数,如此一来,就实现了通过高精度设备模拟低精度设备的目的。

这个 hrtimer 位于 per-cpu 变量 tick_cpu_sched ,类型为 tick_sched:

 
 static DEFINE_PER_CPU(struct tick_sched, tick_cpu_sched);
  
 struct tick_sched {
 struct hrtimer sched_timer;
 unsigned long check_clocks;
 enum tick_nohz_mode nohz_mode;
 ktime_t last_tick;
 int inidle;
 int tick_stopped;
 unsigned long idle_jiffies;
 unsigned long idle_calls;
 unsigned long idle_sleeps;
 int idle_active;
 ktime_t idle_entrytime;
 ktime_t idle_waketime;
 ktime_t idle_exittime;
 ktime_t idle_sleeptime;
 ktime_t iowait_sleeptime;
 ktime_t sleep_length;
 unsigned long last_jiffies;
 u64 next_timer;
 ktime_t idle_expires;
 int do_timer_last;
 atomic_t tick_dep_mask;
 };

在切换到高精度模式时,有 hrtimer_switch_to_hres => tick_setup_sched_timer ,其将 tick_cpu_sched.sched_timer 的回调函数设置为 tick_sched_timer 。

 
 static enum hrtimer_restart tick_sched_timer(struct hrtimer *timer)
 {
 struct tick_sched *ts =
 container_of(timer, struct tick_sched, sched_timer);
 struct pt_regs *regs = get_irq_regs();
 ktime_t now = ktime_get();
  
 // 更新 jiffies
 tick_sched_do_timer(now);
  
 /*
 * Do not call, when we are not in irq context and have
 * no valid regs pointer
 */
 // 处理超时 timer
 if (regs)
 tick_sched_handle(ts, regs);
  
 /* No need to reprogram if we are in idle or full dynticks mode */
 if (unlikely(ts->tick_stopped))
 return HRTIMER_NORESTART;
  
 // 推进一个 tick
 hrtimer_forward(timer, now, tick_period);
  
 // 重启本 hrtimer
 return HRTIMER_RESTART;
 }

于是在该高精度定时器超时时有: tick_sched_timer => tick_sched_handle => update_process_times => run_local_timers => raise_softirq(TIMER_SOFTIRQ) 发出 TIMER_SOFTIRQ。

当然,别忘了 jiffies 也是由 tick 的处理函数来更新的,所以在这里我们也需要更新 jiffies :

 
 static void tick_sched_do_timer(ktime_t now)
 {
 int cpu = smp_processor_id();
 // 只有一个 CPU 能更新 jiffie
 // 如果支持 NO_HZ 特性,可能负责这个的 CPU 睡觉去了,则需要当前 CPU 承担该责任
 #ifdef CONFIG_NO_HZ_COMMON
 /*
 * Check if the do_timer duty was dropped. We don't care about
 * concurrency: This happens only when the CPU in charge went
 * into a long sleep. If two CPUs happen to assign themselves to
 * this duty, then the jiffies update is still serialized by
 * jiffies_lock.
 */
 if (unlikely(tick_do_timer_cpu == TICK_DO_TIMER_NONE)
 && !tick_nohz_full_cpu(cpu))
 tick_do_timer_cpu = cpu;
 #endif
  
 /* Check, if the jiffies need an update */
 // 如果是当前 CPU 负责更新 jiffie,则更新之
 if (tick_do_timer_cpu == cpu)
 tick_do_update_jiffies64(now);
 }

timekeeping_init

初始化时钟守护者,其实就是时钟。

对于用户来说,我们感知的是真实世界的真实时间,也就是墙上时间(wall time),clocksource 只能提供一个按给定频率不停递增的周期计数,需要把它和真实的墙上时间相关联。

Linux 时间种类

  • RTC

    又称 CMOS 时间,维护了 wall time。精度低,一般只有毫秒级。

    通常由一个专门的计时硬件来实现。不管系统是否上电,RTC 中的时间信息都不会丢失,计时会一直持续进行,硬件上通常使用一个电池对其进行单独的供电。

  • xtime

    同样维护了 wall time,保存在内存中,基于 clocksource 实现。精度自然也就取决于 clocksource,最高可达纳秒级。

    当用户修改时间时,可能会发生跳变。

  • monotonic time

    自系统开机后就一直单调增加,但系统休眠时不会递增。

  • raw monotonic time

    与 monotonic time 类似,也是单调递增的时间,但不会受到 NTP 时间调整的影响,代表着系统独立时钟硬件对时间的统计

  • boot time

    与 monotonic time 类型,但会加上系统休眠的时间,代表系统上电后的总时间

timekeeper 维护了 monotonic time, raw monotonic time, xtime :

 
 struct timekeeper {
 struct tk_read_base tkr_mono; // 维护 CLOCK_MONOTONIC
 struct tk_read_base tkr_raw; // 维护 CLOCK_MONOTONIC_RAW
 u64 xtime_sec; // 当前 CLOCK_REALTIME,单位秒
 unsigned long ktime_sec; // 当前 CLOCK_MONOTONIC,单位秒
 struct timespec64 wall_to_monotonic; // CLOCK_REALTIME 和 CLOCK_MONOTONIC 的差值
 ktime_t offs_real; // CLOCK_MONOTONIC 和 CLOCK_REALTIME 的差值
 ktime_t offs_boot; // CLOCK_MONOTONIC 和 boot time 的差值
 ktime_t offs_tai; // CLOCK_MONOTONIC 和 CLOCK_TAI 的差值
 s32 tai_offset; // UTC 和 TAI 的差值,单位秒
 unsigned int clock_was_set_seq;
 u8 cs_was_changed_seq;
 ktime_t next_leap_ktime;
 struct timespec64 raw_time; // raw monotonic time
  
 /* The following members are for timekeeping internal use */
 cycle_t cycle_interval; // 一个 NTP interval 的 cycle 数
 u64 xtime_interval; // 一个 NTP interval 的 ns 数
 s64 xtime_remainder;
 u64 raw_interval;
 /* The ntp_tick_length() value currently being used.
 * This cached copy ensures we consistently apply the tick
 * length for an entire tick, as ntp_tick_length may change
 * mid-tick, and we don't want to apply that new value to
 * the tick in progress.
 */
 u64 ntp_tick;
 /* Difference between accumulated time and NTP time in ntp
 * shifted nano seconds. */
 s64 ntp_error;
 u32 ntp_error_shift;
 u32 ntp_err_mult;
 #ifdef CONFIG_DEBUG_TIMEKEEPING
 long last_warning;
 /*
 * These simple flag variables are managed
 * without locks, which is racy, but they are
 * ok since we don't really care about being
 * super precise about how many events were
 * seen, just that a problem was observed.
 */
 int underflow_seen;
 int overflow_seen;
 #endif
 };

我们来看它的初始化函数 timekeeping_init :

 
 void __init timekeeping_init(void)
 {
 struct timekeeper *tk = &tk_core.timekeeper;
 struct clocksource *clock;
 unsigned long flags;
 struct timespec64 now, boot, tmp;
  
 // 获取持久化时间,在 x86 下是调用 x86_platform.get_wallclock (mach_get_cmos_time),即读取 RTC
 read_persistent_clock64(&now);
 if (!timespec64_valid_strict(&now)) {
 pr_warn("WARNING: Persistent clock returned invalid value!n"
 " Check your CMOS/BIOS settings.n");
 now.tv_sec = 0;
 now.tv_nsec = 0;
 } else if (now.tv_sec || now.tv_nsec)
 persistent_clock_exists = true;
  
 // x86 下为 0
 read_boot_clock64(&boot);
 if (!timespec64_valid_strict(&boot)) {
 pr_warn("WARNING: Boot clock returned invalid value!n"
 " Check your CMOS/BIOS settings.n");
 boot.tv_sec = 0;
 boot.tv_nsec = 0;
 }
  
 raw_spin_lock_irqsave(&timekeeper_lock, flags);
 write_seqcount_begin(&tk_core.seq);
 // 初始化 NTP,重置相关变量
 ntp_init();
  
 // 获取默认的时钟源,即 clocksource_jiffies
 clock = clocksource_default_clock();
 if (clock->enable)
 clock->enable(clock);
 // 将 timekeeper 和 clocksource_jiffies 关联起来,即使用 clocksource_jiffies 来作为时钟源
 tk_setup_internals(tk, clock);
  
 // 利用 RTC 读到的时间来设置 xtime / raw time
 tk_set_xtime(tk, &now);
 tk->raw_time.tv_sec = 0;
 tk->raw_time.tv_nsec = 0;
 if (boot.tv_sec == 0 && boot.tv_nsec == 0)
 boot = tk_xtime(tk);
  
 // 将自启动以来的时间作为 monotonic time 和 xtime 的差值(wall_to_monotonic),这里为 0?
 set_normalized_timespec64(&tmp, -boot.tv_sec, -boot.tv_nsec);
 tk_set_wall_to_mono(tk, tmp);
  
 timekeeping_update(tk, TK_MIRROR | TK_CLOCK_WAS_SET);
  
 write_seqcount_end(&tk_core.seq);
 raw_spin_unlock_irqrestore(&timekeeper_lock, flags);
 }

可以发现此时 timekeeper 以 jiffies 作为时钟源。在收到 tick / 模拟 tick 时,都会去更新 timekeeper :

tick_periodic / tick_do_update_jiffies64 => update_wall_time

获取时间

getboottime

获取系统启动时刻的时间。 getboottime => getboottime64

 
 void getboottime64(struct timespec64 *ts)
 {
 struct timekeeper *tk = &tk_core.timekeeper;
 ktime_t t = ktime_sub(tk->offs_real, tk->offs_boot);
  
 *ts = ktime_to_timespec64(t);
 }

将 timekeeper 中的 CLOCK_MONOTONIC 和 CLOCK_REALTIME 的差值 减去 CLOCK_MONOTONIC 和 boot time 的差值 得到 CLOCK_REALTIME 和 boot time 的差值。

即得到了启动时刻时间。

ktime_get

获取系统启动以来所经过的时间,不包含休眠时间。返回 ktime 结构

根据 tkr_mono 得到。

ktime_get_boottime

获取系统启动以来所经过的时间,包含休眠时间。返回 ktime 结构

ktime_get_boottime => ktime_get_with_offset(TK_OFFS_BOOT)

由 tkr_mono 时间加上 offsets[TK_OFFS_BOOT] 得到

get_monotonic_boottime

获取系统启动以来所经过的时间,包含休眠时间。返回 timespec 结构

实际上是对 ktime_get_boottime 结果调用了 ktime_to_timespec

get_seconds

获取当前 xtime ,单位为秒。

返回 xtime_sec 。

getnstimeofday

获取当前时间。返回 timespec 结构。

getnstimeofday => getnstimeofday64 => __getnstimeofday64

由 xtime_sec 加上 tkr_mono 提供的 ns 得到

do_gettimeofday

获取当前时间。返回 timeval 结构。

实际上是对 getnstimeofday64 的结果降低精度得到。

切换时钟源

当 注册精度更高的时钟源 / 时钟源的精度(rating)改变 时,会进行时钟源切换:

比如注册时钟源时,有 clocksource_register_hz => __clocksource_register_scale => clocksource_select => __clocksource_select

通过 clocksource_find_best 找到 rating 最高的时钟源,然后考虑 override,如果最终选中的时钟源和原来不同,则调用 timekeeping_notify 进行切换:

 
 /**
 * timekeeping_notify - Install a new clock source
 * @clock: pointer to the clock source
 *
 * This function is called from clocksource.c after a new, better clock
 * source has been registered. The caller holds the clocksource_mutex.
 */
 int timekeeping_notify(struct clocksource *clock)
 {
 struct timekeeper *tk = &tk_core.timekeeper;
 // 如果时钟源没变,则返回
 if (tk->tkr_mono.clock == clock)
 return 0;
 // 在 machine 停止的状况下执行 change_clocksource,切换时钟源
 stop_machine(change_clocksource, clock, NULL);
 // 通知 tick_cpu_sched 时钟源改变了
 tick_clock_notify();
 return tk->tkr_mono.clock == clock ? 0 : -1;
 }

HPET / PIT 初始化

x86_init.timers.timer_init 在 x86 下为 hpet_time_init ,因此这里做的就是对 HPET 进行初始化:

 
 void __init hpet_time_init(void)
 {
 // 初始化 HPET ,如果失败,则启用 PIT
 if (!hpet_enable())
 setup_pit_timer();
 setup_default_timer_irq();
 }

hpet_enable 负责对 HPET 进行初始化,然后调用 hpet_clocksource_register 将其注册为时钟源:

 
 static int hpet_clocksource_register(void)
 {
 u64 start, now;
 cycle_t t1;
  
 /* Start the counter */
 // 通过 MMIO 来操作 HPET 的寄存器
 // 首先将其停止,清空其计数,然后再次启动它
 hpet_restart_counter();
  
 /* Verify whether hpet counter works */
 // 读取当前 counter
 t1 = hpet_readl(HPET_COUNTER);
 // 读取当前 TSC
 start = rdtsc();
  
 /*
 * We don't know the TSC frequency yet, but waiting for
 * 200000 TSC cycles is safe:
 * 4 GHz == 50us
 * 1 GHz == 200us
 */
 do {
 rep_nop();
 now = rdtsc();
 } while ((now - start) < 200000UL);
  
 // 等待 TSC 过去 200000 个计数,再读 HPET 当前的 counter,如果和之前没区别,说明 HPET 不工作,无法使用
 if (t1 == hpet_readl(HPET_COUNTER)) {
 printk(KERN_WARNING
 "HPET counter not counting. HPET disabledn");
 return -ENODEV;
 }
  
 // 如果 HPET 正常工作,则注册它为时钟源
 clocksource_register_hz(&clocksource_hpet, (u32)hpet_freq);
 return 0;
 }

如果 HPET 初始化失败,hpet_enable 将返回 0,则需要回退到 PIT(i8253): setup_pit_timer => clockevent_i8253_init

 
 void __init clockevent_i8253_init(bool oneshot)
 {
 if (oneshot) {
 i8253_clockevent.features |= CLOCK_EVT_FEAT_ONESHOT;
 i8253_clockevent.set_state_oneshot = pit_set_oneshot;
 }
 /*
 * Start pit with the boot cpu mask. x86 might make it global
 * when it is used as broadcast device later.
 */
 i8253_clockevent.cpumask = cpumask_of(smp_processor_id());
  
 clockevents_config_and_register(&i8253_clockevent, PIT_TICK_RATE,
 0xF, 0x7FFF);
 }

TSC 初始化

最后我们调用 tsc_init 来初始化 TSC。

 
 void __init tsc_init(void)
 {
 u64 lpj;
 int cpu;
  
 if (!boot_cpu_has(X86_FEATURE_TSC)) {
 setup_clear_cpu_cap(X86_FEATURE_TSC_DEADLINE_TIMER);
 return;
 }
  
 // 校准 CPU,获取 CPU 频率
 cpu_khz = x86_platform.calibrate_cpu();
 // 校准 TSC,获取 TSC 频率,前文提到过 TSC 需要通过其他时间源来确定 TSC 的实际频率
 tsc_khz = x86_platform.calibrate_tsc();
  
 /*
 * Trust non-zero tsc_khz as authorative,
 * and use it to sanity check cpu_khz,
 * which will be off if system timer is off.
 */
 // 如果没能算出来,则以 CPU 频率为准
 if (tsc_khz == 0)
 tsc_khz = cpu_khz;
 // 如果 CPU 和 TSC 的频率相差太大,则 CPU 频率以 TSC 的频率为准
 else if (abs(cpu_khz - tsc_khz) * 10 > tsc_khz)
 cpu_khz = tsc_khz;
  
 // 如果还是没能算出来,则 TSC 不可用,返回
 if (!tsc_khz) {
 mark_tsc_unstable("could not calculate TSC khz");
 setup_clear_cpu_cap(X86_FEATURE_TSC_DEADLINE_TIMER);
 return;
 }
  
 pr_info("Detected %lu.%03lu MHz processorn",
 (unsigned long)cpu_khz / 1000,
 (unsigned long)cpu_khz % 1000);
  
 /*
 * Secondary CPUs do not run through tsc_init(), so set up
 * all the scale factors for all CPUs, assuming the same
 * speed as the bootup CPU. (cpufreq notifiers will fix this
 * up if their speed diverges)
 */
 // 以当前 CPU(BSP) 频率为准,为所有的 CPU 计算 cycle 到 ns 转换的辅助参数 scale
 for_each_possible_cpu(cpu) {
 cyc2ns_init(cpu);
 set_cyc2ns_scale(tsc_khz, cpu);
 }
  
 // 内核参数禁用了 TSC,返回
 if (tsc_disabled > 0)
 return;
  
 /* now allow native_sched_clock() to use rdtsc */
  
 tsc_disabled = 0;
 static_branch_enable(&__use_tsc);
  
 if (!no_sched_irq_time)
 enable_sched_clock_irqtime();
  
 lpj = ((u64)tsc_khz * 1000);
 do_div(lpj, HZ);
 lpj_fine = lpj;
  
 // 设置 delay 系列(ndely / udelay / mdelay)函数基于 TSC 来实现(delay_tsc)
 use_tsc_delay();
  
 // 检查 CPU 和 TSC 是否同步
 if (unsynchronized_tsc())
 mark_tsc_unstable("TSCs unsynchronized");
  
 // 检查 TSC 是否可靠
 check_system_tsc_reliable();
  
 detect_art();
 }

而实际上,在 tsc_init 执行之前,先会执行 device_initcall(init_tsc_clocksource)

 
 static int __init init_tsc_clocksource(void)
 {
 if (!boot_cpu_has(X86_FEATURE_TSC) || tsc_disabled > 0 || !tsc_khz)
 return 0;
  
 // 如果 tsc 的可靠性已经验证,则清除 必须验证 标记
 if (tsc_clocksource_reliable)
 clocksource_tsc.flags &= ~CLOCK_SOURCE_MUST_VERIFY;
 /* lower the rating if we already know its unstable: */
 // 检查 TSC 是否稳定,在 tsc_init 前通过全局变量标记 TSC 是否稳定
 if (check_tsc_unstable()) {
 // 如果 tsc 不稳定,则降低 rating 最低,清除连续标记
 clocksource_tsc.rating = 0;
 clocksource_tsc.flags &= ~CLOCK_SOURCE_IS_CONTINUOUS;
 }
  
 if (boot_cpu_has(X86_FEATURE_NONSTOP_TSC_S3))
 clocksource_tsc.flags |= CLOCK_SOURCE_SUSPEND_NONSTOP;
  
 /*
 * Trust the results of the earlier calibration on systems
 * exporting a reliable TSC.
 */
 if (boot_cpu_has(X86_FEATURE_TSC_RELIABLE)) {
 if (boot_cpu_has(X86_FEATURE_ART))
 art_related_clocksource = &clocksource_tsc;
 // 注册为时钟源
 clocksource_register_khz(&clocksource_tsc, tsc_khz);
 return 0;
 }
  
 schedule_delayed_work(&tsc_irqwork, 0);
 return 0;
 }

小结

至此,时钟系统初始化完毕。回顾一下:

 
 start_kernel => setup_arch => x86_init.timers.wallclock_init() 初始化 wallclock
 => register_refined_jiffies(CLOCK_TICK_RATE) 初始化 refined_jiffies
 => tick_init
 => init_timers
 => hrtimers_init
 => timekeeping_init
 => time_init 设置 late_time_init 为 x86_late_time_init
 => x86_late_time_init => x86_init.timers.timer_init(hpet_time_init) 初始化 hpet,如果不支持,则初始化 pit
 => tsc_init 初始化 tsc,需要借助其他时间源来校准,因此放 hpet 后
 => rest_init

对照时间系统的层级:

 
 低精度定时器(timer)
 相互替代
 框架层 tick_device <-----> 高精度定时器(hrtimer) timekeeper
  
 抽象层 时钟事件设备(clock_event_device) 时钟源(clocksource)
  
 硬件层 硬件定时器(如 APIC timer) 时钟源(如 RTC TSC)

我们可以发现,高精度设备的初始化,如 APIC timer ,HPT ,TSC 在 hrtimer 和 timekeeping 之后,在此之前,Linux 会先使用精度较低的设备,当精度更高的设备被初始化并注册到系统中时,会自动将数据源切换为这些设备。比如说最终作为 hrtimer 的硬件定时器为 per-CPU 的 local APIC timer。

参考

droidphone 的 Linux时间子系统 系列文章

linux-insides

最后

以上就是精明小蝴蝶为你收集整理的Linux 时间系统分析的全部内容,希望文章能够帮你解决Linux 时间系统分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(63)

评论列表共有 0 条评论

立即
投稿
返回
顶部