|   
 UID1 威望1240 点 金钱24019 金币 点卡317 点 
 | 
1#
 发表于 2010-1-26 09:21 
 | 只看该作者 
 带锁的容器
| 复制代码#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <queue>
static int fcntl_flag(int fd, int get_op, int set_op, int val, int flag)
{
        const int flags = fcntl(fd, get_op, 0);
        if (flags < 0)
                return -1;
        if(flag==1)
        {
                return fcntl(fd, set_op, flags|val);
        }
        else
        {
                return fcntl(fd, set_op, flags&(~val));
        }
}
using namespace std;
template <typename T> class ThreadQueue
{
public:
        ThreadQueue(unsigned int size)
        {
                this->size = size;
                pthread_mutex_init(&mutex, NULL);
                pthread_cond_init(&cond, NULL);
                pipefd[0] = -1;
                pipefd[1] = -1;
                if (pipe(pipefd) || fcntl_flag(pipefd[0], F_GETFD, F_SETFD, FD_CLOEXEC, 1)
                                || fcntl_flag(pipefd[1], F_GETFD, F_SETFD, FD_CLOEXEC, 1)
                                || fcntl_flag(pipefd[0], F_GETFL, F_SETFL, O_NONBLOCK , 0)
                                || fcntl_flag(pipefd[1], F_GETFL, F_SETFL, O_NONBLOCK , 1))
                {
                        if (pipefd[0] >= 0)
                        {
                                close(pipefd[0]);
                        }
                        if (pipefd[1] >= 0)
                        {
                                close(pipefd[1]);
                        }
                        printf("pipe error\n");
                }
        }
        
        bool push(T a)
        {
                bool would_block;
                char c;
                int ret;
RETRY_PUSH:
                would_block=true;
                pthread_mutex_lock(&mutex);
                if (datas.size() < size)
                {
                        would_block = false;
                        datas.push(a);
                        pthread_cond_broadcast(&cond);
                }
                pthread_mutex_unlock(&mutex);
                if(would_block==true)
                {
                        printf("task list full wait for workers!\n");
                        ret=read(pipefd[0],&c,1);
                        printf("read n byte : %d\n",ret);
                        goto RETRY_PUSH;
                }
                return would_block;
        }
        
        T* pop()
        {
                T *ret;
                pthread_mutex_lock(&mutex);
                while (datas.size() == 0)
                {
                        printf("task list empty wait for requeset!\n");
                        pthread_cond_wait(&cond, &mutex);
                }
                ret = &datas.front();
                datas.pop();
                if (datas.size() == size - 1)
                {
                        printf("can get new request!\n");
                        while (write(pipefd[1], "A", 1) == 0)
                        {
                                continue;
                        }
                }
                pthread_mutex_unlock(&mutex);
                return ret;
        }
        
private:
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        int pipefd[2];
        queue<T> datas;
        unsigned int size;
};
 | 
 
| 我是一个呼吸着现在的空气而生活在过去的人
 这样的注定孤独,孤独的身处闹市却犹如置身于荒漠
 我已习惯了孤独,爱上孤独
 他让我看清了自我,还原了自我
 让我再静静的沉思中得到快乐和满足
 再孤独的世界里我一遍又一遍
 不厌其烦的改写着自己的过去
 延伸到现在与未来
 然而那只是泡沫般的美梦
 产生的时刻又伴随着破灭的到来
 在灰飞烟灭的瞬间我看到的是过程的美丽
 而不是结果的悲哀。。。
 
 |  |