背景

有个小伙伴去南山区某家被美国认证的头部机器人公司面试时,被问到这个问题。
让我感叹的是,世道卷得竟是如此厉害,不愧是头部公司。iOS面试八股文考察盛行已久,但没想到已经到这种层次了。
其实我也不知道实现原理,还挺感兴趣的。这也没啥丢人的,没看过就是没看过,不知道就是不知道,还能咋样,搞明白呗。

我把自己搞明白的过程,记录如下。

准备工作

  • libdispatch源码,我选的版本是libdispatch-1271.40.12
  • 代码阅读工具,我用vim,自己觉得顺手的。

源码阅读

dispatch_once

src/once.c中,可以看到dispatch_once函数的实现如下

void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
     dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

这个函数其实是对dispatch_once_f函数的包装(废话),在深入去看dispatch_once_f函数之前,把几个参数类型搞清楚,这对后面的阅读和理解会很有帮助。

dispatch/once.h中,可以看到dispatch_once_t的定义如下:

typedef intptr_t dispatch_once_t;

private/introspection_private.h中,可以看到dispatch_block_t的定义如下:

typedef struct Block_layout *dispatch_block_t;

src/BlocksRuntime/Block_private.h中,可以看到Block_layout结构如下:

struct Block_layout {
    void *isa;
    volatile int32_t flags; // contains ref count
    int32_t reserved;
    void (*invoke)(void *, ...);
    struct Block_descriptor_1 *descriptor;
    // imported variables
};

_dispatch_Block_invoke则是一个宏,定义如下:

#define _dispatch_Block_invoke(bb) ((dispatch_function_t)((struct Block_layout *)bb)->invoke)

dispatch/base.h中,可以看到dispatch_function_t就是一个函数指针:

typedef void (*dispatch_function_t)(void *_Nullable);
小节

dispatch_once的行为:
将传入的block结构中的invoke指针取出,连同传入的两个参数,val和block,一起传入下一层dispatch_once_f

dispatch_once_f

dispatch_once_f函数实现,依旧在src/once.c中:

void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {
        return;
    }
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
        return _dispatch_once_mark_done_if_quiesced(l, v);
    }
#endif
#endif
    if (_dispatch_once_gate_tryenter(l)) {
        return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);
}

这个函数,可以把用编译宏控制的内容先去掉,这样就可以排除干扰信息,高效地弄明白这个函数的行为。精简之后的代码非常短:

void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

    if (_dispatch_once_gate_tryenter(l)) {
        return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);
}

这样看起来就清晰很多了,继续深入了解各结构和方法。在src/shims/lock.h中,可以了解如下类型定义

typedef mach_port_t dispatch_tid;
typedef uint32_t dispatch_lock;

typedef struct dispatch_gate_s {
    dispatch_lock dgl_lock;
} dispatch_gate_s, *dispatch_gate_t;

typedef struct dispatch_once_gate_s {
      union {
          dispatch_gate_s dgo_gate;
          uintptr_t dgo_once;
      };
} dispatch_once_gate_s, *dispatch_once_gate_t;

所以在dispatch_once_f函数中,一开始对dispatch_once_t类型的val参数做了强制类型转换,实际上并没有实际改变什么,val依旧是uintptr_t内存宽度的变量。更多是利用了union的特性,实现了对老旧的32位平台的兼容性工作。

_dispatch_once_gate_tryenter

src/shims/lock.h中,可以看到tryenter实现如下:

#define DLOCK_ONCE_UNLOCKED ((uintptr_t)0)

DISPATCH_ALWAYS_INLINE
static inline bool _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
      return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,              (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}

这个方法非常短,但是dispatch_once的“上锁”原理也就在这里了。
os_atomic_cmpxchg是系统级的原子操作,实现比较和赋值功能,这是一个宏,可以展开看到更底层调用的是C++的atomic_compare_exchange_strong_explicit函数:

#define os_atomic_cmpxchg(p, e, v, m) \
        ({ _os_atomic_basetypeof(p) _r = (e); \
        atomic_compare_exchange_strong_explicit(_os_atomic_c11_atomic(p), \
        &_r, v, memory_order_##m, memory_order_relaxed); })

这里的行为是,比较&l->dgo_onceDLOCK_ONCE_UNLOCKED是否相等,如果相等则返回true,并给&l->dgo_once赋值(uintptr_t)_dispatch_lock_value_for_self();如果不相等,则返回false。
在更底层的atomic_compare_exchange_strong_explicit传值时,利用宏的连接操作把success情况的memory_order也设置为了memory_order_relaxed,而failure情况,已经显式设置为了memory_order_relaxed
这里还可以看到在往更下层传参时,利用了_os_atomic_c11_atomic宏对p,也就是&l->dgo_once进行了处理,进一步展开_os_atomic_c11_atomic宏,其实就是volatile,如下:

#ifndef os_atomic
#define os_atomic(type) type _Atomic volatile
#endif

#ifndef _os_atomic_c11_atomic
#define _os_atomic_c11_atomic(p) \
        ((__typeof__(*(p)) _Atomic *)(p))
#endif

到这里,所有的关于多线程对&l->dgo_once的操作都清晰了,描述如下:

  • dispatch_once的token,也就是&l->dgo_once参数,使用volatile修饰,告诉编译器不要优化取值,每次都要从原始地址取值。所以&l->dgo_once就可以理解为“锁”。这样可以确保每个线程都会取到最新的&l->dgo_once
  • memory_order_relaxed: 保证当前操作的原子性。

接着来看给&l->dgo_once上的“锁”到底是什么。在src/shims/lock.h中可以看到以下内容:

typedef mach_port_t dispatch_tid;
#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc) 

DISPATCH_ALWAYS_INLINE
static inline dispatch_lock
_dispatch_lock_value_from_tid(dispatch_tid tid)
{
    return tid & DLOCK_OWNER_MASK;
}

DISPATCH_ALWAYS_INLINE
static inline dispatch_lock
_dispatch_lock_value_for_self(void)
{
    return _dispatch_lock_value_from_tid(_dispatch_tid_self());
}

小结:
_dispatch_once_gate_tryenter就是尝试“上锁”行为,如果是初次进入,因为&l->dgo_onceDLOCK_ONCE_UNLOCKED相等,就成功上锁,&l->dgo_once被设置为当前线程的tid & DLOCK_OWNER_MASK ,在mach内核中,tid是mach_port_t
用dispatch写单例时,传入的token值默认都是0,而DLOCK_ONCE_UNLOCKED也是0值:((uintptr_t)0)

dispatch_once_f函数中可以看出,一旦上锁成功,就调用_dispatch_once_callout,从传参可以看出来,这里接下来应该就是就是要执行函数指针func所指的函数,也就是block内容。
同时还可以看到,如果上锁失败,则会进入_dispatch_once_wait开始等待,接下来分别详细了解_dispatch_once_callout_dispatch_once_wait的内容。

_dispatch_once_callout

src/once.c中,可以看到_dispatch_once_callout

static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt, dispatch_function_t func)
{
    _dispatch_client_callout(ctxt, func);
    _dispatch_once_gate_broadcast(l);
}

继续进一步展开,在src/init.c中,可以看到_dispatch_once_callout

void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    _dispatch_get_tsd_base();
    void *u = _dispatch_get_unwind_tsd();
    if (likely(!u)) return f(ctxt);
    _dispatch_set_unwind_tsd(NULL);
    f(ctxt);
    _dispatch_free_unwind_tsd();
    _dispatch_set_unwind_tsd(u);
}

_dispatch_once_callout函数,可以看到调用了函数指针f所指的函数,并将block作为参数传入。其他诸如_dispatch_get_tsd_base_dispatch_set_unwind_tsd等函数,都是获取线程相关的数据(tsd: Thread Specific Data),属于多线程调用的正常操作,这里只关心函数指针f的实际调用,所以不对其他函数进行展开。总而言之,dispatch_once传入的block,实际就是在_dispatch_once_callout得到执行的。

在函数指针f得到执行之后,需要将此情况广播出去,以告知还在等待的线程放弃等待。接下来看到看_dispatch_once_gate_broadcast是怎么进行广播的。

src/once.c中,可以看到_dispatch_once_gate_broadcast

static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
    dispatch_lock value_self = _dispatch_lock_value_for_self();
    uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    v = _dispatch_once_mark_quiescing(l);
#else
    v = _dispatch_once_mark_done(l);
#endif
    if (likely((dispatch_lock)v == value_self)) return;
    _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}

还是一样的阅读代码技巧,把使用编译宏控制的内容先忽略,得到最核心的代码逻辑:

static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
    dispatch_lock value_self = _dispatch_lock_value_for_self();

    uintptr_t v;
    v = _dispatch_once_mark_done(l);

    if (likely((dispatch_lock)v == value_self)) return;
    _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}

由此可以看到,在广播之前,通过调用_dispatch_once_mark_done对dispatch传入的token进行了处理。(使用dispatch的时候,通常传入参数都写为token,在上述解析代码过程中可以看到有强制类型转换,表达方式也在变化,分别有val&l->dgo_oncel等,本质上都是传入的token)

这里通过原子交换os_atomic_xchg操作,对&dgo->dgo_once设置为DLOCK_ONCE_DONE,把原来的“上锁值”返回出去。

DISPATCH_ALWAYS_INLINE
static inline uintptr_t
    _dispatch_once_mark_done(dispatch_once_gate_t dgo) 
{
    return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}

回到_dispatch_once_gate_broadcast函数中,可以看到如果是上锁线程自己完成了mark_done行为,则会直接return回去,也就是一个单纯的mark_done,否则会继续通过_dispatch_gate_broadcast_slow继续处理。
src/shims/lock.c文件,_dispatch_gate_broadcast_slow函数中:

void
_dispatch_gate_broadcast_slow(dispatch_gate_t dgl, dispatch_lock cur)
{
    if (unlikely(!_dispatch_lock_is_locked_by_self(cur))) {
        DISPATCH_CLIENT_CRASH(cur, "lock not owned by current thread");
    }

#if HAVE_UL_UNFAIR_LOCK
    _dispatch_unfair_lock_wake(&dgl->dgl_lock, ULF_WAKE_ALL);
#elif HAVE_FUTEX
    _dispatch_futex_wake(&dgl->dgl_lock, INT_MAX, FUTEX_PRIVATE_FLAG);
#else
    (void)dgl;
#endif
}

可以看到,除了进行一个死锁检查之外,并没有特别的操作。
如果发现锁不是当前线程所持有,则引发crash,其实这就是进入死锁状态了。

小结:
_dispatch_once_callout就是实际执行点,block内容会在_dispatch_client_callout中执行,执行后在_dispatch_once_gate_broadcast中完成LOCK_DONE操作,如果其他线程进入了_dispatch_once_gate_broadcast尝试标记锁定,则_dispatch_once_gate_broadcast会通过_dispatch_lock_is_locked_by_self检查死锁情况。

_dispatch_once_wait

src/shims/lock.c中,_dispatch_once_wait函数实现如下:

  void
  _dispatch_once_wait(dispatch_once_gate_t dgo)
  {
      dispatch_lock self = _dispatch_lock_value_for_self();
      uintptr_t old_v, new_v;
  #if HAVE_UL_UNFAIR_LOCK || HAVE_FUTEX
      dispatch_lock *lock = &dgo->dgo_gate.dgl_lock;
  #endif
      uint32_t timeout = 1;

      for (;;) {
          os_atomic_rmw_loop(&dgo->dgo_once, old_v, new_v, relaxed, {
              if (likely(old_v == DLOCK_ONCE_DONE)) {
                  os_atomic_rmw_loop_give_up(return);
              }
  #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
              if (DISPATCH_ONCE_IS_GEN(old_v)) {
                  os_atomic_rmw_loop_give_up({
                      os_atomic_thread_fence(acquire);
                      return _dispatch_once_mark_done_if_quiesced(dgo, old_v);
                  });
              }
  #endif
              new_v = old_v | (uintptr_t)DLOCK_WAITERS_BIT;
              if (new_v == old_v) os_atomic_rmw_loop_give_up(break);
          });
          if (unlikely(_dispatch_lock_is_locked_by((dispatch_lock)old_v, self))) {
              DISPATCH_CLIENT_CRASH(0, "trying to lock recursively");
          }
  #if HAVE_UL_UNFAIR_LOCK
          _dispatch_unfair_lock_wait(lock, (dispatch_lock)new_v, 0,
                  DLOCK_LOCK_NONE);
  #elif HAVE_FUTEX
          _dispatch_futex_wait(lock, (dispatch_lock)new_v, NULL,
                  FUTEX_PRIVATE_FLAG);
  #else
          _dispatch_thread_switch(new_v, 0, timeout++);
  #endif
          (void)timeout;
      }
  }

精简后,代码逻辑如下:

  void
  _dispatch_once_wait(dispatch_once_gate_t dgo)
  {
      dispatch_lock self = _dispatch_lock_value_for_self();
      uintptr_t old_v, new_v;
      uint32_t timeout = 1;

      for (;;) {
          os_atomic_rmw_loop(&dgo->dgo_once, old_v, new_v, relaxed, {
              if (likely(old_v == DLOCK_ONCE_DONE)) {
                  os_atomic_rmw_loop_give_up(return);
              }

              new_v = old_v | (uintptr_t)DLOCK_WAITERS_BIT;
              if (new_v == old_v) os_atomic_rmw_loop_give_up(break);
          });
          if (unlikely(_dispatch_lock_is_locked_by((dispatch_lock)old_v, self))) {
              DISPATCH_CLIENT_CRASH(0, "trying to lock recursively");
          }

          _dispatch_thread_switch(new_v, 0, timeout++);

          (void)timeout;
      }
  }

通过代码可以看到,是通过一个死循环持续查询上锁状态,如果锁状态是DLOCK_ONCE_DONE,则退出循环。
至此,dispatch_once的整个流程相关函数代码都走完了,不算特别复杂。

总结

dispatch_once的实现原理,在dispatch_once_f中体现,本质上就是:
先进的线程尝试上锁,如果成功则执行block,执行后变更锁状态为DLOCK_ONCE_DONE,否则进入等待状态。
后进的等待线程会不断去查询锁状态&dgo->dgo_once,如果锁变成DLOCK_ONCE_DONE则放弃查询,返回。
这里说的“尝试上锁”是一个比喻,在底层实现,其实是通过原子操作来完成的。

类似的,可以通过阅读代码理解其他dispatch_xxx函数的实现,从而可以了解整个GCD的底层实现原理。

最后,把上述代码阅读整理总结为一张图:
dispatch_once

感想

做了多年的移动端团队管理工作,招聘面试无数的iOS开发,当然清醒的知道当前iOS开发行业内卷何其严重,大厂面试必考大量的八股题,但我认为考察这类底层实现是最没用的面试考察。
招聘面试,面试官需要发现候选人解决问题的能力,越是优秀的候选人,越要去挖掘其面对未知时的思考和行为,这样才能招到优秀的人才。而不是用八股文考察某一个晦涩的知识点,收获的只会是擅长死记硬背的人。
也许有人会说,这种八股文考察可以体现处理问题的经验和过往开发工作的深度。我不太赞同,因为实际工作经验中的每个人处理的业务开发各有不同,并不会人人都关注到底层某处的实现。计算机世界的深度和广度堪称无穷,而人的精力有限,只要想考八股题,永远都有考不完的八股题,永远都有难住候选人的八股题。

以上。


3 个评论

yujifei · 2022-01-18 - 13:04

好久没见更新了

    ganquan · 2022-01-19 - 09:35

    是的,多年没写了,还是要写起来。
    没想到这么多年还在关注,非常感动,非常感谢:)
    希望你一切都好,祝新年快乐万事大吉

yujifei · 2022-01-20 - 15:53

偶尔进来学习一下

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

我不是机器人*