互斥量属性是用pthread_mutexattr_t结构表示的。第11章中每次对互斥量进行初始化时,都是通过使用PTHREAD_MUTEX_INITIALIZER常量或者用指向互斥量属性结构的空指针作为参数调用pthread_mutex_init函数,得到互斥量的默认属性。

对于非默认属性,可以用pthread_mutexattr_init初始化pthread_mutexattr_t结构,用pthread_mutexattr_destroy来反初始化。

#include <pthread.h>
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t*attr);
//两个函数的返回值:若成功,返回0;否则,返回错误编号

pthread_mutexattr_init 函数将用默认的互斥量属性初始化 pthread_mutexattr_t结构。

值得注意的3个属性是:进程共享属性、健壮属性以及类型属性。POSIX.1中,进程共享属性是可选的。可以通过检查系统中是否定义了_POSIX_THREAD_PROCESS_SHARED 符号来判断这个平台是否支持进程共享这个属性,也可以在运行时把_SC_THREAD_PRO-CESS_SHARED 参数传给sysconf函数进行检查。虽然这个选项并不是遵循POSIX标准的操作系统必须提供的,但是SingleUNIX Specification要求遵循XSI标准的操作系统支持这个选项。

在进程中,多个线程可以访问同一个同步对象。正如在第11章中看到的,这是默认的行为。在这种情况下,进程共享互斥量属性需设置为PTHREAD_PROCESS_PRIVATE

我们将在第14章和第15章中看到,存在这样的机制:允许相互独立的多个进程把同一个内存数据块映射到它们各自独立的地址空间中。就像多个线程访问共享数据一样,多个进程访问共享数据通常也需要同步。如果进程共享互斥量属性设置为PTHREAD_PROCESS_SHARED,从多个进程彼此之间共享的内存数据块中分配的互斥量就可以用于这些进程的同步。

#include <pthread.h>
int pthread_mutexattr_getpshared(const pthread_mutexattr_t*restrict attr,int *restrict pshared);
int pthread_mutexattr_setpshared(pthread_mutex-attr_t *attr,int pshared);
//两个函数的返回值:若成功,返回0;否则,返回错误编号

进程共享互斥量属性设置为PTHREAD_PROCESS_PRIVATE时,允许pthread线程库提供更有效的互斥量实现,这在多线程应用程序中是默认的情况。在多个进程共享多个互斥量的情况下, pthread线程库可以限制开销较大的互斥量实现。

互斥量健壮属性与在多个进程间共享的互斥量有关。这意味着,当持有互斥量的进程终止时,需要解决互斥量状态恢复的问题。这种情况发生时,互斥量处于锁定状态,恢复起来很困难。其他阻塞在这个锁的进程将会一直阻塞下去。

可以使用 pthread_mutexattr_getrobust 函数获取健壮的互斥量属性的值。

可以调用 pthread_mutexattr_setrobust函数设置健壮的互斥量属性的值。

#include <pthread.h>
int pthread_mutexattr_getrobust(const pthread_mutexattr_t*restrict attr,int *restrict robust);
int pthread_mutexattr_setrobust(pthread_mutex-attr_t *attr,int robust);
//两个函数的返回值:若成功,返回0;否则,返回错误编号

健壮属性取值有两种可能的情况。默认值是 PTHREAD_MUTEX_STALLED,这意味着持有互斥量的进程终止时不需要采取特别的动作。这种情况下,使用互斥量后的行为是未定义的,等待该互斥量解锁的应用程序会被有效地“拖住”。另一个取值是PTHREAD_MUTEX_ROBUST。这个值将导致线程调用pthread_mutex_lock获取锁,而该锁被另一个进程持有,但它终止时并没有对该锁进行解锁,此时线程会阻塞,从pthread_mutex_lock返回的值为EOWNERDEAD而不是0。应用程序可以通过这个特殊的返回值获知,若有可能(要保护状态的细节以及如何进行恢复会因不同的应用程序而异),不管它们保护的互斥量状态如何,都需要进行恢复。

使用健壮的互斥量改变了我们使用pthread_mutex_lock的方式,因为现在必须检查3个返回值而不是之前的两个:不需要恢复的成功、需要恢复的成功以及失败。但是,即使不用健壮的互斥量,也可以只检查成功或者失败。

在本书的4个平台中,只有Linux 3.2.0目前支持健壮的线程互斥量。Solaris 10只在它的Solaris线程库中支持健壮的线程互斥量(参阅Solaris手册的mutex_init(3C)获取相关的信息)。但是Solaris 11支持健壮的线程互斥量。

如果应用状态无法恢复,在线程对互斥量解锁以后,该互斥量将处于永久不可用状态。为了避免这样的问题,线程可以调用pthread_mutex_consistent函数,指明与该互斥量相关的状态在互斥量解锁之前是一致的。

#include <pthread.h>
int pthread_mutex_consistent(pthread_mutex_t *mutex);
//返回值:若成功,返回0;否则,返回错误编号

如果线程没有先调用pthread_mutex_consistent 就对互斥量进行了解锁,那么其他试图获取该互斥量的阻塞线程就会得到错误码ENOTRECOVERABLE。如果发生这种情况,互斥量将不再可用。线程通过提前调用pthread_mutex_consistent,能让互斥量正常工作,这样它就可以持续被使用。

类型互斥量属性控制着互斥量的锁定特性。POSIX.1定义了4种类型。

  • PTHREAD_MUTEX_NORMAL 一种标准互斥量类型,不做任何特殊的错误检查或死锁检测。.
  • PTHREAD_MUTEX_ERRORCHECK 此互斥量类型提供错误检查。
  • PTHREAD_MUTEX_RECURSIVE 此互斥量类型允许同一线程在互斥量解锁之前对该互斥量进行多次加锁。递归互斥量维护锁的计数,在解锁次数和加锁次数不相同的情况下,不会释放锁。所以,如果对一个递归互斥量加锁两次,然后解锁一次,那么这个互斥量将依然处于加锁状态,对它再次解锁以前不能释放该锁。
  • PTHREAD_MUTEX_DEFAULT 此互斥量类型可以提供默认特性和行为。操作系统在实现它的时候可以把这种类型自由地映射到其他互斥量类型中的一种。例如,Linux 3.2.0把这种类型映射为普通的互斥量类型,而FreeBSD 8.0则把它映射为错误检查互斥量类型。

这4种类型的行为如图12-5所示。“不占用时解锁”这一栏指的是,一个线程对被另一个线程加锁的互斥量进行解锁的情况。“在已解锁时解锁”这一栏指的是,当一个线程对已经解锁的互斥量进行解锁时将会发生什么,这通常是编码错误引起的。

图12-5 互斥量类型行为

可以用pthread_mutexattr_gettype函数得到互斥量类型属性,用pthread_mutexattr_settype函数修改互斥量类型属性。

#include <pthread.h>
int pthread_mutexattr_gettype(const pthread_mutexattr_t*restrict attr, int*restrict type);
int pthread_mutexattr_settype(pthread_mutexattr_t*attr, int type);
//两个函数的返回值: 若成功,返回0;否则,返回错误编号

回忆 11.6.6 节中学过的,互斥量用于保护与条件变量关联的条件。在阻塞线程之前,pthread_cond_waitpthread_cond_timedwait函数释放与条件相关的互斥量。这就允许其他线程获取互斥量、改变条件、释放互斥量以及给条件变量发信号。既然改变条件时必须占有互斥量,使用递归互斥量就不是一个好主意。如果递归互斥量被多次加锁,然后用在调用pthread_cond_wait函数中,那么条件永远都不会得到满足,因为pthread_cond_wait所做的解锁操作并不能释放互斥量。

如果需要把现有的单线程接口放到多线程环境中,递归互斥量是非常有用的,但由于现有程序兼容性的限制,不能对函数接口进行修改。然而,使用递归锁可能很难处理,因此应该只在没有其他可行方案的时候才使用它们。

实例

图12-6描述了一种情况,在这种情况中递归互斥量看起来像是在解决并发问题。假设func1func2 是函数库中现有的函数,其接口不能改变,因为存在调用这两个接口的应用程序,而且应用程序不能改动。

图12-6 使用递归锁的一种可能情况

为了保持接口跟原来相同,我们把互斥量嵌入到了数据结构中,把这个数据结构的地址(x)作为参数传入。这种方案只有在为此数据结构提供分配函数时才可行,所以应用程序并不知道数据结构的大小(假设我们在其中增加互斥量之后必须扩大该数据结构的大小)。

如果在最早定义数据结构时,预留了足够的可填充字段,允许把某些填充字段替换成互斥量,这种方法也是可行的。不过遗憾的是,大多数程序员并不善于预测未来,所以这并不是普遍可行的实践。

如果func1func2函数都必须操作这个结构,而且可能会有一个以上的线程同时访问该数据结构,那么 func1func2必须在操作数据以前对互斥量加锁。如果 func1 必须调用func2,这时如果互斥量不是递归类型的,那么就会出现死锁。如果能在调用 func2 之前释放互斥量,在 func2 返回后重新获取互斥量,那么就可以避免使用递归互斥量,但这也给其他的线程提供了机会,其他的线程可以在 func1 执行期间抓住互斥量的控制,修改这个数据结构。这也许是不可接受的,当然具体的情况要取决于互斥量试图提供什么样的保护。

图12-7显示了这种情况下使用递归互斥量的一种替代方法。通过提供func2函数的私有版本,称之为func2_locked函数,可以保持func1func2函数接口不变,而且避免使用递归互斥量。要调用 func2_locked 函数,必须占有嵌入在数据结构中的互斥量,这个数据结构的地址是作为参数传入的。func2_locked的函数体包含func2的副本,func2现在只是获取互斥量,调用func2_locked,然后释放互斥量。

图12-7 避免使用递归锁的一种可能情况

如果并不一定要保持库函数接口不变,就可以在每个函数中增加第二个参数表明这个结构是否被调用者锁定。但是,如果可以的话,保持接口不变通常是更好的选择,可以避免实现过程中人为加入的东西对原有系统产生不良影响。

提供加锁和不加锁版本的函数,这样的策略在简单的情况下通常是可行的。在更加复杂的情况下,比如,库需要调用库以外的函数,而且可能会再次回调库中的函数时,就需要依赖递归锁。

实例

图12-8中的程序解释了有必要使用递归互斥量的另一种情况。这里,有一个“超时”(timeout)函数,它允许安排另一个函数在未来的某个时间运行。假设线程并不是很昂贵的资源,就可以为每个挂起的超时函数创建一个线程。线程在时间未到时将一直等待,时间到了以后再调用请求的函数。

#include "apue.h"
#include <pthread.h>
#include <time.h>
#include <sys/time.h>

extern int makethread(void *(*)(void *), void *);

struct to_info {
    void          (*to_fn)(void *);    /* function */
    void           *to_arg;            /* argument */
    struct timespec to_wait;        /* time to wait */
};

#define SECTONSEC  1000000000    /* seconds to nanoseconds */

#if !defined(CLOCK_REALTIME) || defined(BSD)
#define clock_nanosleep(ID, FL, REQ, REM)    nanosleep((REQ), (REM))
#endif

#ifndef CLOCK_REALTIME
#define CLOCK_REALTIME 0
#define USECTONSEC 1000        /* microseconds to nanoseconds */

void
clock_gettime(int id, struct timespec *tsp)
{
    struct timeval tv;

    gettimeofday(&tv, NULL);
    tsp->tv_sec = tv.tv_sec;
    tsp->tv_nsec = tv.tv_usec * USECTONSEC;
}
#endif

void *
timeout_helper(void *arg)
{
    struct to_info    *tip;

    tip = (struct to_info *)arg;
    clock_nanosleep(CLOCK_REALTIME, 0, &tip->to_wait, NULL);
    (*tip->to_fn)(tip->to_arg);
    free(arg);
    return(0);
}

void
timeout(const struct timespec *when, void (*func)(void *), void *arg)
{
    struct timespec    now;
    struct to_info    *tip;
    int                err;

    clock_gettime(CLOCK_REALTIME, &now);
    if ((when->tv_sec > now.tv_sec) ||
      (when->tv_sec == now.tv_sec && when->tv_nsec > now.tv_nsec)) {
        tip = malloc(sizeof(struct to_info));
        if (tip != NULL) {
            tip->to_fn = func;
            tip->to_arg = arg;
            tip->to_wait.tv_sec = when->tv_sec - now.tv_sec;
            if (when->tv_nsec >= now.tv_nsec) {
                tip->to_wait.tv_nsec = when->tv_nsec - now.tv_nsec;
            } else {
                tip->to_wait.tv_sec--;
                tip->to_wait.tv_nsec = SECTONSEC - now.tv_nsec +
                  when->tv_nsec;
            }
            err = makethread(timeout_helper, (void *)tip);
            if (err == 0)
                return;
            else
                free(tip);
        }
    }

    /*
     * We get here if (a) when <= now, or (b) malloc fails, or
     * (c) we can't make a thread, so we just call the function now.
     */
    (*func)(arg);
}

pthread_mutexattr_t attr;
pthread_mutex_t mutex;

void
retry(void *arg)
{
    pthread_mutex_lock(&mutex);

    /* perform retry steps ... */

    pthread_mutex_unlock(&mutex);
}

int
main(void)
{
    int                err, condition, arg;
    struct timespec    when;

    if ((err = pthread_mutexattr_init(&attr)) != 0)
        err_exit(err, "pthread_mutexattr_init failed");
    if ((err = pthread_mutexattr_settype(&attr,
      PTHREAD_MUTEX_RECURSIVE)) != 0)
        err_exit(err, "can't set recursive type");
    if ((err = pthread_mutex_init(&mutex, &attr)) != 0)
        err_exit(err, "can't create recursive mutex");

    /* continue processing ... */

    pthread_mutex_lock(&mutex);

    /*
     * Check the condition under the protection of a lock to
     * make the check and the call to timeout atomic.
     */
    if (condition) {
        /*
         * Calculate the absolute time when we want to retry.
         */
        clock_gettime(CLOCK_REALTIME, &when);
        when.tv_sec += 10;    /* 10 seconds from now */
        timeout(&when, retry, (void *)((unsigned long)arg));
    }
    pthread_mutex_unlock(&mutex);

    /* continue processing ... */

    exit(0);
}

图12-8 使用递归互斥量

如果我们不能创建线程,或者安排函数运行的时间已过,这时问题就出现了。在这些情况下,我们只需在当前上下文中调用之前请求运行的函数。因为函数要获取的锁和我们现在占有的锁是同一个,所以除非该锁是递归的,否则就会出现死锁。

在图12-4中我们使用makethread函数以分离状态创建线程。因为传递给timeout函数的func函数参数将在未来运行,所以我们不希望一直空等线程结束。

可以调用sleep等待超时到期,但它提供的时间粒度是秒级的。如果希望等待的时间不是整数秒,就需要用nanosleep或者clock_nanosleep函数,它们两个提供了更高精度的休眠时间。

在未定义CLOCK_REALTIME的系统中,我们根据nanosleep定义clock_nanosleep。然而,FreeBSD 8.0 定义这个符号支持 clock_gettimeclock_settime,但并不支持clock_nanosleep。(只有Linux 3.2.0和Solaris 10目前支持clock_nanosleep。)

另外,在未定义CLOCK_REALTIME的系统中,我们提供了我们自己的clock_gettime实现,该实现调用了gettimeofday并把微秒转换成纳秒。

timeout的调用者需要占有互斥量来检查条件,并且把retry函数安排为原子操作。retry函数试图对同一个互斥量进行加锁。除非互斥量是递归的,否则,如果 timeout 函数直接调用retry,会导致死锁。

results matching ""

    No results matching ""