屏障(barrier)是用户协调多个线程并行工作的同步机制。屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。我们已经看到一种屏障,pthread_join
函数就是一种屏障,允许一个线程等待,直到另一个线程退出。
但是屏障对象的概念更广,它们允许任意数量的线程等待,直到所有的线程完成处理工作,而线程不需要退出。所有线程达到屏障后可以接着工作。
可以使用 pthread_barrier_init
函数对屏障进行初始化,用thread_barrier_destroy
函数反初始化。
#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr,
unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
//两个函数的返回值:若成功,返回0;否则,返回错误编号
初始化屏障时,可以使用count
参数指定,在允许所有线程继续运行之前,必须到达屏障的线程数目。使用attr
参数指定屏障对象的属性,我们会在下一章详细讨论。现在设置attr
为NULL
,用默认属性初始化屏障。如果使用pthread_barrier_init
函数为屏障分配资源,那么在反初始化屏障时可以调用pthread_barrier_destroy
函数释放相应的资源。
可以使用pthread_barrier_wait
函数来表明,线程已完成工作,准备等所有其他线程赶上来。
#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);
//返回值:若成功,返回0或者PTHREAD_BARRIER_SERIAL_THREAD;否则,返回错误编号
调用pthread_barrier_wai
t的线程在屏障计数(调用pthread_barrier_init
时设定)未满足条件时,会进入休眠状态。如果该线程是最后一个调用pthread_barrier_wait
的线程,就满足了屏障计数,所有的线程都被唤醒。
对于一个任意线程,pthread_barrier_wait
函数返回了PTHREAD_BARRIER_SERIAL_THREAD
。剩下的线程看到的返回值是0。这使得一个线程可以作为主线程,它可以工作在其他所有线程已完成的工作结果上。
一旦达到屏障计数值,而且线程处于非阻塞状态,屏障就可以被重用。但是除非在调用了pthread_barrier_destroy
函数之后,又调用了pthread_barrier_init
函数对计数用另外的数进行初始化,否则屏障计数不会改变。
实例
图11-16给出了在一个任务上合作的多个线程之间如何用屏障进行同步。
#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>
#define NTHR 8 /* number of threads */
#define NUMNUM 8000000L /* number of numbers to sort */
#define TNUM (NUMNUM/NTHR) /* number to sort per thread */
long nums[NUMNUM];
long snums[NUMNUM];
pthread_barrier_t b;
#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t,
int (*)(const void *, const void *));
#endif
/*
* Compare two long integers (helper function for heapsort)
*/
int
complong(const void *arg1, const void *arg2)
{
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;
if (l1 == l2)
return 0;
else if (l1 < l2)
return -1;
else
return 1;
}
/*
* Worker thread to sort a portion of the set of numbers.
*/
void *
thr_fn(void *arg)
{
long idx = (long)arg;
heapsort(&nums[idx], TNUM, sizeof(long), complong);
pthread_barrier_wait(&b);
/*
* Go off and perform more work ...
*/
return((void *)0);
}
/*
* Merge the results of the individual sorted ranges.
*/
void
merge()
{
long idx[NTHR];
long i, minidx, sidx, num;
for (i = 0; i < NTHR; i++)
idx[i] = i * TNUM;
for (sidx = 0; sidx < NUMNUM; sidx++) {
num = LONG_MAX;
for (i = 0; i < NTHR; i++) {
if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) {
num = nums[idx[i]];
minidx = i;
}
}
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}
int
main()
{
unsigned long i;
struct timeval start, end;
long long startusec, endusec;
double elapsed;
int err;
pthread_t tid;
/*
* Create the initial set of numbers to sort.
*/
srandom(1);
for (i = 0; i < NUMNUM; i++)
nums[i] = random();
/*
* Create 8 threads to sort the numbers.
*/
gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR+1);
for (i = 0; i < NTHR; i++) {
err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
if (err != 0)
err_exit(err, "can't create thread");
}
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);
/*
* Print the sorted list.
*/
startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;
elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sort took %.4f seconds\n", elapsed);
for (i = 0; i < NUMNUM; i++)
printf("%ld\n", snums[i]);
exit(0);
}
图11-16 使用屏障
这个例子给出了多个线程只执行一个任务时,使用屏障的简单情况。在更加实际的情况下,工作线程在调用pthread_barrier_wait
函数返回后会接着执行其他的活动。
在这个实例中,使用8个线程分解了800万个数的排序工作。每个线程用堆排序算法对100万个数进行排序(详细算法请参阅Knuth[1998])。然后主线程调用一个函数对这些结果进行合并。
并不需要使用 pthread_barrier_wait
函数中的返回值PTHREAD_BARRIER_SERIAL_THREAD
来决定哪个线程执行结果合并操作,因为我们使用了主线程来完成这个任务。这也是把屏障计数值设为工作线程数加1的原因,主线程也作为其中的一个候选线程。
如果只用一个线程去完成800万个数的堆排序,那么与图11-16中的程序相比,我们将能看到图11-16中的程序在性能上有显著提升。在8核处理器系统上,单线程程序对800万个数进行排序需要12.14秒。同样的系统,使用8个并行线程和1个合并结果的线程,相同的800万个数的排序仅需要1.91秒,速度提升了6倍。