屏障(barrier)是用户协调多个线程并行工作的同步机制。屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。我们已经看到一种屏障,pthread_join函数就是一种屏障,允许一个线程等待,直到另一个线程退出。

但是屏障对象的概念更广,它们允许任意数量的线程等待,直到所有的线程完成处理工作,而线程不需要退出。所有线程达到屏障后可以接着工作。

可以使用 pthread_barrier_init 函数对屏障进行初始化,用thread_barrier_destroy函数反初始化。

#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
                const pthread_barrierattr_t *restrict attr, 
                unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
//两个函数的返回值:若成功,返回0;否则,返回错误编号

初始化屏障时,可以使用count参数指定,在允许所有线程继续运行之前,必须到达屏障的线程数目。使用attr参数指定屏障对象的属性,我们会在下一章详细讨论。现在设置attrNULL,用默认属性初始化屏障。如果使用pthread_barrier_init函数为屏障分配资源,那么在反初始化屏障时可以调用pthread_barrier_destroy函数释放相应的资源。

可以使用pthread_barrier_wait函数来表明,线程已完成工作,准备等所有其他线程赶上来。

#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);
//返回值:若成功,返回0或者PTHREAD_BARRIER_SERIAL_THREAD;否则,返回错误编号

调用pthread_barrier_wait的线程在屏障计数(调用pthread_barrier_init时设定)未满足条件时,会进入休眠状态。如果该线程是最后一个调用pthread_barrier_wait的线程,就满足了屏障计数,所有的线程都被唤醒。

对于一个任意线程,pthread_barrier_wait函数返回了PTHREAD_BARRIER_SERIAL_THREAD。剩下的线程看到的返回值是0。这使得一个线程可以作为主线程,它可以工作在其他所有线程已完成的工作结果上。

一旦达到屏障计数值,而且线程处于非阻塞状态,屏障就可以被重用。但是除非在调用了pthread_barrier_destroy函数之后,又调用了pthread_barrier_init函数对计数用另外的数进行初始化,否则屏障计数不会改变。

实例

图11-16给出了在一个任务上合作的多个线程之间如何用屏障进行同步。

#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>

#define NTHR   8                /* number of threads */
#define NUMNUM 8000000L            /* number of numbers to sort */
#define TNUM   (NUMNUM/NTHR)    /* number to sort per thread */

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t b;

#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t,
                    int (*)(const void *, const void *));
#endif

/*
 * Compare two long integers (helper function for heapsort)
 */
int
complong(const void *arg1, const void *arg2)
{
    long l1 = *(long *)arg1;
    long l2 = *(long *)arg2;

    if (l1 == l2)
        return 0;
    else if (l1 < l2)
        return -1;
    else
        return 1;
}

/*
 * Worker thread to sort a portion of the set of numbers.
 */
void *
thr_fn(void *arg)
{
    long    idx = (long)arg;

    heapsort(&nums[idx], TNUM, sizeof(long), complong);
    pthread_barrier_wait(&b);

    /*
     * Go off and perform more work ...
     */
    return((void *)0);
}

/*
 * Merge the results of the individual sorted ranges.
 */
void
merge()
{
    long    idx[NTHR];
    long    i, minidx, sidx, num;

    for (i = 0; i < NTHR; i++)
        idx[i] = i * TNUM;
    for (sidx = 0; sidx < NUMNUM; sidx++) {
        num = LONG_MAX;
        for (i = 0; i < NTHR; i++) {
            if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) {
                num = nums[idx[i]];
                minidx = i;
            }
        }
        snums[sidx] = nums[idx[minidx]];
        idx[minidx]++;
    }
}

int
main()
{
    unsigned long    i;
    struct timeval    start, end;
    long long        startusec, endusec;
    double            elapsed;
    int                err;
    pthread_t        tid;

    /*
     * Create the initial set of numbers to sort.
     */
    srandom(1);
    for (i = 0; i < NUMNUM; i++)
        nums[i] = random();

    /*
     * Create 8 threads to sort the numbers.
     */
    gettimeofday(&start, NULL);
    pthread_barrier_init(&b, NULL, NTHR+1);
    for (i = 0; i < NTHR; i++) {
        err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
        if (err != 0)
            err_exit(err, "can't create thread");
    }
    pthread_barrier_wait(&b);
    merge();
    gettimeofday(&end, NULL);

    /*
     * Print the sorted list.
     */
    startusec = start.tv_sec * 1000000 + start.tv_usec;
    endusec = end.tv_sec * 1000000 + end.tv_usec;
    elapsed = (double)(endusec - startusec) / 1000000.0;
    printf("sort took %.4f seconds\n", elapsed);
    for (i = 0; i < NUMNUM; i++)
        printf("%ld\n", snums[i]);
    exit(0);
}

图11-16 使用屏障

这个例子给出了多个线程只执行一个任务时,使用屏障的简单情况。在更加实际的情况下,工作线程在调用pthread_barrier_wait函数返回后会接着执行其他的活动。

在这个实例中,使用8个线程分解了800万个数的排序工作。每个线程用堆排序算法对100万个数进行排序(详细算法请参阅Knuth[1998])。然后主线程调用一个函数对这些结果进行合并。

并不需要使用 pthread_barrier_wait 函数中的返回值PTHREAD_BARRIER_SERIAL_THREAD 来决定哪个线程执行结果合并操作,因为我们使用了主线程来完成这个任务。这也是把屏障计数值设为工作线程数加1的原因,主线程也作为其中的一个候选线程。

如果只用一个线程去完成800万个数的堆排序,那么与图11-16中的程序相比,我们将能看到图11-16中的程序在性能上有显著提升。在8核处理器系统上,单线程程序对800万个数进行排序需要12.14秒。同样的系统,使用8个并行线程和1个合并结果的线程,相同的800万个数的排序仅需要1.91秒,速度提升了6倍。

results matching ""

    No results matching ""