星星博客's Archiver

cnangel 发表于 2010-1-26 09:21

带锁的容器

[code]#include <pthread.h>

#include <sys/types.h>

#include <unistd.h>

#include <fcntl.h>

#include <queue>



static int fcntl_flag(int fd, int get_op, int set_op, int val, int flag)

{

        const int flags = fcntl(fd, get_op, 0);

        if (flags < 0)

                return -1;

        if(flag==1)

        {

                return fcntl(fd, set_op, flags|val);

        }

        else

        {

                return fcntl(fd, set_op, flags&(~val));

        }

}



using namespace std;

template <typename T> class ThreadQueue

{

public:

        ThreadQueue(unsigned int size)

        {

                this->size = size;

                pthread_mutex_init(&mutex, NULL);

                pthread_cond_init(&cond, NULL);

                pipefd[0] = -1;

                pipefd[1] = -1;

                if (pipe(pipefd) || fcntl_flag(pipefd[0], F_GETFD, F_SETFD, FD_CLOEXEC, 1)

                                || fcntl_flag(pipefd[1], F_GETFD, F_SETFD, FD_CLOEXEC, 1)

                                || fcntl_flag(pipefd[0], F_GETFL, F_SETFL, O_NONBLOCK , 0)

                                || fcntl_flag(pipefd[1], F_GETFL, F_SETFL, O_NONBLOCK , 1))

                {

                        if (pipefd[0] >= 0)

                        {

                                close(pipefd[0]);

                        }

                        if (pipefd[1] >= 0)

                        {

                                close(pipefd[1]);

                        }

                        printf("pipe error\n");

                }

        }

       

        bool push(T a)

        {

                bool would_block;

                char c;

                int ret;

RETRY_PUSH:

                would_block=true;

                pthread_mutex_lock(&mutex);

                if (datas.size() < size)

                {

                        would_block = false;

                        datas.push(a);

                        pthread_cond_broadcast(&cond);

                }

                pthread_mutex_unlock(&mutex);

                if(would_block==true)

                {

                        printf("task list full wait for workers!\n");

                        ret=read(pipefd[0],&c,1);

                        printf("read n byte : %d\n",ret);

                        goto RETRY_PUSH;

                }

                return would_block;

        }

       

        T* pop()

        {

                T *ret;

                pthread_mutex_lock(&mutex);

                while (datas.size() == 0)

                {

                        printf("task list empty wait for requeset!\n");

                        pthread_cond_wait(&cond, &mutex);

                }

                ret = &datas.front();

                datas.pop();

                if (datas.size() == size - 1)

                {

                        printf("can get new request!\n");

                        while (write(pipefd[1], "A", 1) == 0)

                        {

                                continue;

                        }

                }

                pthread_mutex_unlock(&mutex);

                return ret;

        }

       

private:

        pthread_mutex_t mutex;

        pthread_cond_t cond;

        int pipefd[2];

        queue<T> datas;

        unsigned int size;

};
[/code]

页: [1]

Powered by Discuz! Archiver 7.0.0  © 2001-2009 Comsenz Inc.