登录后台

页面导航

本文编写于 610 天前,最后修改于 610 天前,其中某些信息可能已经过时。

Pthread

注:为简化难度,在此忽略部分pthread代码,只分析部分关键代码

pthread_mutex是pthread进行封装的一系列互斥锁操作。

主要函数包括:

  • pthread_mutex_init() 初始化互斥锁
  • pthread_mutex_destroy() 删除互斥锁
  • pthread_mutex_lock():占有互斥锁(阻塞操作)
  • pthread_mutex_trylock():试图占有互斥锁(不阻塞操作)。即,当互斥锁空闲时,将占有该锁;否则,立即返回。
  • pthread_mutex_unlock(): 释放互斥锁
  • pthread_mutexattr_(): 互斥锁属性相关的函数

主要变量包括:

struct __pthread_mutex_s
{
  int __lock __LOCK_ALIGNMENT;
  unsigned int __count;
  int __owner;
#if !__PTHREAD_MUTEX_NUSERS_AFTER_KIND
  unsigned int __nusers;
#endif
  /* KIND must stay at this position in the structure to maintain
     binary compatibility with static initializers.
     Concurrency notes:
     The __kind of a mutex is initialized either by the static
     PTHREAD_MUTEX_INITIALIZER or by a call to pthread_mutex_init.
     After a mutex has been initialized, the __kind of a mutex is usually not
     changed.  BUT it can be set to -1 in pthread_mutex_destroy or elision can
     be enabled.  This is done concurrently in the pthread_mutex_*lock functions
     by using the macro FORCE_ELISION. This macro is only defined for
     architectures which supports lock elision.
     For elision, there are the flags PTHREAD_MUTEX_ELISION_NP and
     PTHREAD_MUTEX_NO_ELISION_NP which can be set in addition to the already set
     type of a mutex.
     Before a mutex is initialized, only PTHREAD_MUTEX_NO_ELISION_NP can be set
     with pthread_mutexattr_settype.
     After a mutex has been initialized, the functions pthread_mutex_*lock can
     enable elision - if the mutex-type and the machine supports it - by setting
     the flag PTHREAD_MUTEX_ELISION_NP. This is done concurrently. Afterwards
     the lock / unlock functions are using specific elision code-paths.  */
  int __kind;
  __PTHREAD_COMPAT_PADDING_MID
#if __PTHREAD_MUTEX_NUSERS_AFTER_KIND
  unsigned int __nusers;
#endif
#if !__PTHREAD_MUTEX_USE_UNION
  __PTHREAD_SPINS_DATA;
  __pthread_list_t __list;
# define __PTHREAD_MUTEX_HAVE_PREV      1
#else
  __extension__ union
  {
    __PTHREAD_SPINS_DATA;
    __pthread_slist_t __list;
  };
# define __PTHREAD_MUTEX_HAVE_PREV      0
#endif
  __PTHREAD_COMPAT_PADDING_END
};

typedef union
{
  struct __pthread_mutex_s __data;
  char __size[__SIZEOF_PTHREAD_MUTEX_T];
  long int __align;
} pthread_mutex_t;

int __lock; 资源竞争引用计数

int __kind; 锁类型,init 函数中mutexattr 参数传递,该参数可以为NULL,一般PTHREAD_MUTEX_NORMAL

1、互斥锁初始化函数--pthread_mutex_init

int ___pthread_mutex_init (pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
{
  const struct pthread_mutexattr *imutexattr;

  ASSERT_TYPE_SIZE (pthread_mutex_t, __SIZEOF_PTHREAD_MUTEX_T);

  /* __kind is the only field where its offset should be checked to
     avoid ABI breakage with static initializers.  */
  ASSERT_PTHREAD_INTERNAL_OFFSET (pthread_mutex_t, __data.__kind,
                  __PTHREAD_MUTEX_KIND_OFFSET);
  ASSERT_PTHREAD_INTERNAL_MEMBER_SIZE (pthread_mutex_t, __data.__kind, int);

  imutexattr = ((const struct pthread_mutexattr *) mutexattr
        ?: &default_mutexattr);

  /* Clear the whole variable.  */
  memset (mutex, '\0', __SIZEOF_PTHREAD_MUTEX_T);

  /* Default values: mutex not used yet.  */
  // mutex->__count = 0;    already done by memset
  // mutex->__owner = 0;    already done by memset
  // mutex->__nusers = 0;    already done by memset
  // mutex->__spins = 0;    already done by memset
  // mutex->__next = NULL;    already done by memset

  LIBC_PROBE (mutex_init, 1, mutex);

  return 0;
}

在pthread_mutex_intit中,主要完成的工作为对互斥锁变量进行初始化,即 memset (mutex, '\0', __SIZEOF_PTHREAD_MUTEX_T);,同时对互斥锁进行一些检查。

2、占有互斥锁函数--pthread_mutex_lock

#define __NR_futex 202 //对应Linux系统调用sys_futex

// glibc/sysdeps/generic/atomic-machine.h
#define atomic_compare_and_exchange_val_acq(mem, newval, oldval) \
  ({ __typeof (mem) __gmemp = (mem);                                      \
     __typeof (*mem) __gret = *__gmemp;                                      \
     __typeof (*mem) __gnewval = (newval);                              \
                                                                      \
     if (__gret == (oldval))                                              \
       *__gmemp = __gnewval;                                              \
     __gret; })

// glibc/include/atomic.h.html
# define atomic_exchange_acq(mem, newvalue) \
  ({ __typeof ((__typeof (*(mem))) *(mem)) __atg5_oldval;                      \
     __typeof (mem) __atg5_memp = (mem);                                      \
     __typeof ((__typeof (*(mem))) *(mem)) __atg5_value = (newvalue);              \
                                                                              \
     do                                                                              \
       __atg5_oldval = *__atg5_memp;                                              \
     while (__builtin_expect                                                      \
            (atomic_compare_and_exchange_bool_acq (__atg5_memp, __atg5_value, \
                                                   __atg5_oldval), 0));              \
     __atg5_oldval; })

// glibc/sysdeps/nptl/lowlevellock-futex.h
#define lll_futex_timed_wait(futexp, val, timeout, private)     \
  lll_futex_syscall (4, futexp,                                 \
                     __lll_private_flag (FUTEX_WAIT, private),  \
                     val, timeout)

// glibc/sysdeps/nptl/lowlevellock-futex.h
#define lll_futex_wait(futexp, val, private) \
  lll_futex_timed_wait (futexp, val, NULL, private)

// glibc/nptl/lowlevellock.c
void __lll_lock_wait (int *futex, int private)
{
  if (*futex == 2)
    lll_futex_wait (futex, 2, private); /* Wait if *futex == 2.  */
  while (atomic_exchange_acq (futex, 2) != 0)
    lll_futex_wait (futex, 2, private); /* Wait if *futex == 2.  */
}

// glibc/sysdeps/nptl/lowlevellock.h
#define __lll_lock(futex, private)                                      \
  ((void)                                                               \
   ({                                                                   \
     int *__futex = (futex);                                            \
     if (__glibc_unlikely                                               \
         (atomic_compare_and_exchange_bool_acq (__futex, 1, 0)))        \
       {                                                                \
         if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
           __lll_lock_wait_private (__futex);                           \
         else                                                           \
           __lll_lock_wait (__futex, private);                          \
       }                                                                \
   }))

static inline void lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
  /* The single-threaded optimization is only valid for private
     mutexes.  For process-shared mutexes, the mutex could be in a
     shared mapping, so synchronization with another process is needed
     even without any threads.  If the lock is already marked as
     acquired, POSIX requires that pthread_mutex_lock deadlocks for
     normal mutexes, so skip the optimization in that case as
     well.  */
  int private = PTHREAD_MUTEX_PSHARED (mutex);
  if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)
    mutex->__data.__lock = 1;
  else
    lll_lock (mutex->__data.__lock, private);
}

int PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex)
{
  /* See concurrency notes regarding mutex type which is loaded from __kind
     in struct __pthread_mutex_s in sysdeps/nptl/bits/thread-shared-types.h.  */
  unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);

  LIBC_PROBE (mutex_entry, 1, mutex);

  if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
    {
      FORCE_ELISION (mutex, goto elision);
    simple:
      /* Normal mutex.  */
      LLL_MUTEX_LOCK_OPTIMIZED (mutex);
      assert (mutex->__data.__owner == 0);
    }

  /* Record the ownership.  */
  mutex->__data.__owner = id;
#ifndef NO_INCR
  ++mutex->__data.__nusers;
#endif

  LIBC_PROBE (mutex_acquired, 1, mutex);

  return 0;
}

atomic_compare_and_exchange_bool_acq宏定义当中,主要实现的功能为判断memoldval值是否相同,如果相同,则将mem置为newvalue,并且返回值始终为原来的mem值。

atomic_exchange_acq宏定义当中,主要实现功能为将memnewvalue进行交换并返回原来的mem值。

pthread_mutex_lock函数当中,主要调用的是LLL_MUTEX_LOCK_OPTIMIZED,在LLL_MUTEX_LOCK_OPTIMIZED中,首先判断mutex_lock是否为0,如果为0,则代表没有进程或线程使用互斥锁,标记mutex_lock为1进行加锁;反之如果为1,则代表有进程或线程使用互斥锁,则进入__lll_lock函数进行等待。

__lll_lock中,首先调用宏atomic_compare_and_exchange_bool_acq来判断_lock的值是否为0,如果为0,则停止等待并且重置_lock值为1进行加锁;反之如果为1,则调用__lll_lock_wait进行阻塞等待。

__lll_lock_wait中,首先循环判断_lock是否为0,如果_lcok不为0,则将_lock置为2并调用lll_futex_wait函数进行等待;反之如果_lock为0则退出循环。

lll_futex_wait函数中,主要实现的功能为调用4参数系统调用并传入_lock值(此时应该为2),在Linux系统当中,该函数最后转变为202号系调用,即为sys_futex系统调用,通过操作系统对进程进行阻塞处理。

// kernel/futex/waitwake.c
int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset)
{
    struct hrtimer_sleeper timeout, *to;
    struct restart_block *restart;
    struct futex_hash_bucket *hb;
    struct futex_q q = futex_q_init;
    int ret;

    if (!bitset)
        return -EINVAL;
    q.bitset = bitset;

    to = futex_setup_timer(abs_time, &timeout, flags,
                   current->timer_slack_ns);
retry:
    /*
     * Prepare to wait on uaddr. On success, it holds hb->lock and q
     * is initialized.
     */
    ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
    if (ret)
        goto out;

    /* futex_queue and wait for wakeup, timeout, or a signal. */
    futex_wait_queue(hb, &q, to);

    /* If we were woken (and unqueued), we succeeded, whatever. */
    ret = 0;
    if (!futex_unqueue(&q))
        goto out;
    ret = -ETIMEDOUT;
    if (to && !to->task)
        goto out;

    /*
     * We expect signal_pending(current), but we might be the
     * victim of a spurious wakeup as well.
     */
    if (!signal_pending(current))
        goto retry;

    ret = -ERESTARTSYS;
    if (!abs_time)
        goto out;

    restart = &current->restart_block;
    restart->futex.uaddr = uaddr;
    restart->futex.val = val;
    restart->futex.time = *abs_time;
    restart->futex.bitset = bitset;
    restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

    ret = set_restart_fn(restart, futex_wait_restart);

out:
    if (to) {
        hrtimer_cancel(&to->timer);
        destroy_hrtimer_on_stack(&to->timer);
    }
    return ret;
}

//kernel/futex/syscalls.c
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
        u32 __user *uaddr2, u32 val2, u32 val3)
{
    int cmd = op & FUTEX_CMD_MASK;
    unsigned int flags = 0;

    if (!(op & FUTEX_PRIVATE_FLAG))
        flags |= FLAGS_SHARED;

    if (op & FUTEX_CLOCK_REALTIME) {
        flags |= FLAGS_CLOCKRT;
        if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI &&
            cmd != FUTEX_LOCK_PI2)
            return -ENOSYS;
    }

    switch (cmd) {
    case FUTEX_WAIT:
        val3 = FUTEX_BITSET_MATCH_ANY;
        fallthrough;
    case FUTEX_WAIT_BITSET:
        return futex_wait(uaddr, flags, val, timeout, val3);
    case FUTEX_WAKE:
        val3 = FUTEX_BITSET_MATCH_ANY;
        fallthrough;
    case FUTEX_WAKE_BITSET:
        return futex_wake(uaddr, flags, val, val3);
    case FUTEX_REQUEUE:
        return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
    case FUTEX_CMP_REQUEUE:
        return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
    case FUTEX_WAKE_OP:
        return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
    case FUTEX_LOCK_PI:
        flags |= FLAGS_CLOCKRT;
        fallthrough;
    case FUTEX_LOCK_PI2:
        return futex_lock_pi(uaddr, flags, timeout, 0);
    case FUTEX_UNLOCK_PI:
        return futex_unlock_pi(uaddr, flags);
    case FUTEX_TRYLOCK_PI:
        return futex_lock_pi(uaddr, flags, NULL, 1);
    case FUTEX_WAIT_REQUEUE_PI:
        val3 = FUTEX_BITSET_MATCH_ANY;
        return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
                         uaddr2);
    case FUTEX_CMP_REQUEUE_PI:
        return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
    }
    return -ENOSYS;
}

上述两函数出自linux futex系统调用,在pthread进行202号系统调用后并传入参数futex_wait,系统会调用do_futex函数并调用futex_wait函数,进而对进程进行阻塞,将当前进程放入阻塞队列进行阻塞并等待唤醒。

3、释放互斥锁--pthread_mutex_unlock

pthread_mutex_unlock原理基本同pthread_mutex_lock,即将互斥锁当中的_lock置为0,并唤醒阻塞进程,此时在阻塞当中的进程会继续执行while (atomic_exchange_acq (futex, 2) != 0)判断,此时futex为0,则停止循环,并将futex置为2,进入临界区域。

参考文献

https://zh.wikipedia.org/zh-cn/POSIX线程

glibc中pthread源代码

Linux 之mutex 源码分析

pthread_mutex_lock实现

Linux系统调用表

Linux 系统调用futex部分源代码