返回列表 回复 发帖

带锁的容器

  1. #include <pthread.h>

  2. #include <sys/types.h>

  3. #include <unistd.h>

  4. #include <fcntl.h>

  5. #include <queue>



  6. static int fcntl_flag(int fd, int get_op, int set_op, int val, int flag)

  7. {

  8.         const int flags = fcntl(fd, get_op, 0);

  9.         if (flags < 0)

  10.                 return -1;

  11.         if(flag==1)

  12.         {

  13.                 return fcntl(fd, set_op, flags|val);

  14.         }

  15.         else

  16.         {

  17.                 return fcntl(fd, set_op, flags&(~val));

  18.         }

  19. }



  20. using namespace std;

  21. template <typename T> class ThreadQueue

  22. {

  23. public:

  24.         ThreadQueue(unsigned int size)

  25.         {

  26.                 this->size = size;

  27.                 pthread_mutex_init(&mutex, NULL);

  28.                 pthread_cond_init(&cond, NULL);

  29.                 pipefd[0] = -1;

  30.                 pipefd[1] = -1;

  31.                 if (pipe(pipefd) || fcntl_flag(pipefd[0], F_GETFD, F_SETFD, FD_CLOEXEC, 1)

  32.                                 || fcntl_flag(pipefd[1], F_GETFD, F_SETFD, FD_CLOEXEC, 1)

  33.                                 || fcntl_flag(pipefd[0], F_GETFL, F_SETFL, O_NONBLOCK , 0)

  34.                                 || fcntl_flag(pipefd[1], F_GETFL, F_SETFL, O_NONBLOCK , 1))

  35.                 {

  36.                         if (pipefd[0] >= 0)

  37.                         {

  38.                                 close(pipefd[0]);

  39.                         }

  40.                         if (pipefd[1] >= 0)

  41.                         {

  42.                                 close(pipefd[1]);

  43.                         }

  44.                         printf("pipe error\n");

  45.                 }

  46.         }

  47.        

  48.         bool push(T a)

  49.         {

  50.                 bool would_block;

  51.                 char c;

  52.                 int ret;

  53. RETRY_PUSH:

  54.                 would_block=true;

  55.                 pthread_mutex_lock(&mutex);

  56.                 if (datas.size() < size)

  57.                 {

  58.                         would_block = false;

  59.                         datas.push(a);

  60.                         pthread_cond_broadcast(&cond);

  61.                 }

  62.                 pthread_mutex_unlock(&mutex);

  63.                 if(would_block==true)

  64.                 {

  65.                         printf("task list full wait for workers!\n");

  66.                         ret=read(pipefd[0],&c,1);

  67.                         printf("read n byte : %d\n",ret);

  68.                         goto RETRY_PUSH;

  69.                 }

  70.                 return would_block;

  71.         }

  72.        

  73.         T* pop()

  74.         {

  75.                 T *ret;

  76.                 pthread_mutex_lock(&mutex);

  77.                 while (datas.size() == 0)

  78.                 {

  79.                         printf("task list empty wait for requeset!\n");

  80.                         pthread_cond_wait(&cond, &mutex);

  81.                 }

  82.                 ret = &datas.front();

  83.                 datas.pop();

  84.                 if (datas.size() == size - 1)

  85.                 {

  86.                         printf("can get new request!\n");

  87.                         while (write(pipefd[1], "A", 1) == 0)

  88.                         {

  89.                                 continue;

  90.                         }

  91.                 }

  92.                 pthread_mutex_unlock(&mutex);

  93.                 return ret;

  94.         }

  95.        

  96. private:

  97.         pthread_mutex_t mutex;

  98.         pthread_cond_t cond;

  99.         int pipefd[2];

  100.         queue<T> datas;

  101.         unsigned int size;

  102. };
复制代码

                     我是一个呼吸着现在的空气而生活在过去的人
               这样的注定孤独,孤独的身处闹市却犹如置身于荒漠
                                     我已习惯了孤独,爱上孤独
                                 他让我看清了自我,还原了自我
                             让我再静静的沉思中得到快乐和满足
                                   再孤独的世界里我一遍又一遍
                                   不厌其烦的改写着自己的过去
                                             延伸到现在与未来
                                       然而那只是泡沫般的美梦
                                 产生的时刻又伴随着破灭的到来
                         在灰飞烟灭的瞬间我看到的是过程的美丽
                                      而不是结果的悲哀。。。
返回列表