多线程斐波那契对程序

coderboy99

我正在尝试编写一个创建两个线程的程序:“前端”和“后端”线程。我想创建一个“后端”线程来迭代和计算斐波那契序列中的术语对,并将它们放入数组中,并创建一个“前端”线程,该线程将在每次迭代时打印出数组对。

“前端”线程-用于在每次迭代中显示“后端”线程操作的结果

“后端”线程-用于计算和设置数组

即。[5,8],并且在迭代后它将包含[13,21]

我正在努力在线程中实现Fibonacci序列部分,并且取得了以下进展:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>

int fib;
void *front_end(void *ptr);
void *back_end(void *ptr);

int main() {
  pthread_t thread1, thread2;

  int arr[2] = {5,8};
  const int *ptrtoarr;
  ptrtoarr=arr;

  int create1, create2;

  int *s=(int *)(ptrtoarr);
  printf("%d \n", *s);
  ptrtoarr++;
  s = (int *)(ptrtoarr);
  printf("%d \n", *s);
  ptrtoarr--;

  create1 = pthread_create(&thread1, NULL, back_end, &arr);

  if(create1) {
    fprintf(stderr,"Error - pthread_create() return code: %d\n",create1);
    exit(EXIT_FAILURE);
  }

  pthread_join(thread1, NULL);

  //pthread_join(thread2, NULL);
}

// front-end thread to be callback for each back-end iteration
void *front_end(void *ptr) {
  int *sum = ptr;
  int i, upper = atoi(ptr);

  if (upper > 0) {
    for (i=0; i<upper; i++){

//Print the fib pairs

    }
  }
  pthread_exit(0);
}

void *back_end(void *ptr) {
  int i, upper = atoi(ptr);
  fib=1;

  if(upper > 0) {
    int pre1 = 0;
    int current;

    //calc fib numbers.....
    if(fib == 1){
      printf("")

    }
  }

}

有人可以指导我逐步解决这个问题吗?

池上

您的骨骼需要工作。

假设以下内容:

unsigned n = ...;      // How many to generate.
unsigned n_ready = 2;  // How many are ready to print.
unsigned *fibs = malloc(sizeof(unsigned)*n);
fibs[0] = 0;
fibs[1] = 1;

作为后端工作人员的核心,您将拥有

for (unsigned i=2; i<n; ++i) {
   fibs[i] = fibs[i-2] + fibs[i-1];
   n_ready = i+1;
}

在前端工作人员的核心,您将拥有

for (unsigned i=0; i<n; ++i) {
   while (i >= n_ready)
      /* Nothing */;

   printf("%u\n", fibs[i]);
}

问题1

如果线程试图在另一个变量写入变量时尝试读取该变量,则会遇到问题。两个或多个线程同时读取同一变量是可以的。

通过两个线程所使用的变量是n,的元素fib[]n_ready

  • n
    两个线程均未更改,因此我们无需控制对其的访问。

  • fib[i]for i >= n_ready
    仅由后端工作人员访问,因此我们无需控制对这些对象的访问。

  • fib[i]for i < n_ready
    仅由前端工作人员访问,因此我们无需控制对这些访问。

  • n_ready
    后端工作人员可以随时设置n_ready,而前端工作可以随时尝试读取n_ready,因此我们确实需要控制对的访问n_ready

互斥锁通常用于确保一次只有一个线程正在访问资源(例如,变量,变量组,文件句柄等)。

我们的后端工人变成

for (unsigned i=2; i<n; ++i) {
   // The mutex only protects n_ready
   // --nothing else is going to touch fib[i-2] or fib[i-1] or fib[i]--
   // so we don't need to obtain a lock yet.
   fibs[i] = fibs[i-2] + fibs[i-1];

   // We need to access n_ready.
   pthread_mutex_lock(&mutex);
   n_ready = i+1;
   pthread_mutex_unlock(&mutex);
}

我们的前端工作者成为

for (unsigned i=0; i<n; ++i) {
   // We need to access n_ready.
   pthread_mutex_lock(&mutex);
   while (i >= n_ready) {
      // Allow other thread to gain the lock.
      pthread_mutex_unlock(&mutex);

      // We need to access n_ready.
      pthread_mutex_lock(&mutex);
   }

   // The mutex only protects n_ready
   // --nothing is going to change fib[i]--
   // so we can release it now rather than later.
   pthread_mutex_unlock(&mutex);

   printf("%u\n", fibs[i]);
}

问题二

您有一个忙碌的循环。通常,这很糟糕,因为这意味着您的线程正在使用100%的等待时间。(在这种情况下,由于i >= n_ready可能已经成立,因此这实际上是一个不错的策略。但是让我们忽略它。)一个线程可以休眠,直到另一个线程使用条件vars发出信号为止。

我们的后端工人变成

for (unsigned i=2; i<n; ++i) {
   // The mutex only protects n_ready
   // --nothing else is going to touch fib[i-2] or fib[i-1] or fib[i]--
   // so we don't need to obtain a lock yet.
   fibs[i] = fibs[i-2] + fibs[i-1];

   // We need to access n_ready.
   pthread_mutex_lock(&mutex);
   n_ready = i+1;
   // Wake up the other thread if it's blocked.
   pthread_cond_signal(&cond);
   pthread_mutex_unlock(&mutex);
}

我们的前端工作者成为

for (unsigned i=0; i<n; ++i) {
   // We need to access n_ready.
   pthread_mutex_lock(&mutex);
   while (i >= n_ready)
      pthread_cond_wait(&cond, &mutex);

   // The mutex only protects n_ready
   // --nothing is going to change fib[i]--
   // so we can release it now rather than later.
   pthread_mutex_unlock(&mutex);

   printf("%u\n", fibs[i]);
}

始终调用pthread_cond_wait锁定的互斥锁。互斥量在调用时将解锁,并在返回之前将其锁定。这允许其他线程获取互斥对象以进行更改n_ready


完整的代码:

#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define UNUSED(x) (void)(x)


// To control access to n_ready.
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static unsigned n_ready = 0;  // How many are ready to print.

static unsigned n;  // How many to generate.
static unsigned *fibs = NULL;


static void *back_worker(void *unused) {
   UNUSED(unused);

   fibs[0] = 0;
   fibs[1] = 1;

   // We need to access n_ready.
   pthread_mutex_lock(&mutex);
   n_ready = 2;
   // Wake up the other thread if it's blocked.
   pthread_cond_signal(&cond);
   pthread_mutex_unlock(&mutex);

   for (unsigned i=2; i<n; ++i) {
      // The mutex only protects n_ready
      // --nothing is going to touch fib[i]--
      // so we don't need to obtain a lock yet.
      fibs[i] = fibs[i-2] + fibs[i-1];

      // We need to access n_ready.
      pthread_mutex_lock(&mutex);
      n_ready = i+1;
      // Wake up the other thread if it's blocked.
      pthread_cond_signal(&cond);
      pthread_mutex_unlock(&mutex);
   }

   return NULL;
}


static void *front_worker(void *unused) {
   UNUSED(unused);

   for (unsigned i=0; i<n; ++i) {
      // We need to access n_ready.
      pthread_mutex_lock(&mutex);
      while (i >= n_ready)
         pthread_cond_wait(&cond, &mutex);

      // The mutex only protects n_ready
      // --nothing is going to change fib[i]--
      // so we can release it now rather than later.
      pthread_mutex_unlock(&mutex);

      printf("%u\n", fibs[i]);
   }

   return NULL;
}


int main(void) {
   n = 20;  // How many to generate.

   fibs = malloc(sizeof(unsigned) * n);

   pthread_t back_thread;
   if (errno = pthread_create(&back_thread, NULL, back_worker, NULL)) {
      perror(NULL);
      exit(1);
   }

   pthread_t front_thread;
   if (errno = pthread_create(&front_thread, NULL, front_worker, NULL)) {
      perror(NULL);
      exit(1);
   }

   pthread_join(back_thread, NULL);
   pthread_join(front_thread, NULL);

   pthread_cond_destroy(&cond);
   pthread_mutex_destroy(&mutex);
   free(fibs);

   return 0;
}

输出:

$ gcc -Wall -Wextra -pedantic a.c -o a -lpthread && a
0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181

建议进行上述练习

创建一个工作池,以打印出放入队列的数字。输出不需要是有序的。

worker函数已经为您编写。您不能更改mainworker功能。我什至为您创建了队列。您只需让它通过修改线程安全的Queue_enqueueQueue_dequeueQueue_done功能。这些是您可能会更改的唯一功能。

#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#define NUM_WORKERS 4
#define QUEUE_SIZE 10
#define NUM_ITEMS 40

typedef struct {
   pthread_mutex_t mutex;
   pthread_cond_t cond;
   int done;
   int empty;
   int full;
   size_t max;
   size_t next_insert;
   size_t next_read;
   unsigned *buf;
} Queue;

static void Queue_init(Queue* q, size_t max) {
   pthread_mutex_init(&(q->mutex), NULL);
   pthread_cond_init(&(q->cond), NULL);
   q->done = 0;
   q->empty = 1;
   q->full = 0;
   q->max = max;
   q->next_insert = 0;
   q->next_read = 0;
   q->buf = malloc(sizeof(unsigned)*max);
}

static void Queue_destroy(Queue *q) {
   free(q->buf);
   pthread_cond_destroy(&(q->cond));
   pthread_mutex_destroy(&(q->mutex));
}

static void Queue_done(Queue *q) {
   q->done = 1;
}

// Returns the oldest item from the queue (via a parameter) and returns 1.
// If the queue is empty and done, returns 0.
// If the queue is empty and not done, waits until that changes.
static int Queue_dequeue(Queue *q, unsigned *i) {
   while (q->empty && !q->done) {
   }

   if (q->empty) {
      // We are completely done.
      return 0;
   } else {
      *i = q->buf[ q->next_read ];
      q->next_read = ( q->next_read + 1 ) % q->max;
      q->empty = q->next_read == q->next_insert;
      q->full = 0;
      return 1;
   }
}

// Adds the argument to the queue.
// If the queue is full, waits until that changes.
static void Queue_enqueue(Queue *q, unsigned i) {
   while (q->full && !q->done) {
   }

   if (q->done) {
      fprintf(stderr, "Error: Attempted to add item to \"done\" queue.\n");
      return;
   }

   q->buf[q->next_insert] = i;
   q->next_insert = ( q->next_insert + 1 ) % q->max;
   q->empty = 0;
   q->full = q->next_insert == q->next_read;
}

static int msleep(long msec) {
   struct timespec ts;
   int res;

   if (msec < 0) {
       errno = EINVAL;
       return -1;
   }

   ts.tv_sec = msec / 1000;
   ts.tv_nsec = (msec % 1000) * 1000000;

    do {
       res = nanosleep(&ts, &ts);
    } while (res && errno == EINTR);

    return res;
}

// Protects access to stdout.
static pthread_mutex_t stdout_mutex;

static Queue q;

static void *worker(void *worker_id_) {
   uintptr_t worker_id = (uintptr_t)worker_id_;

   unsigned int seed = worker_id;  // Whatever.

   unsigned i;
   while (Queue_dequeue(&q, &i)) {
      pthread_mutex_lock(&stdout_mutex);
      printf("[%" PRIuPTR "] Dequeued %u\n", worker_id, i);
      pthread_mutex_unlock(&stdout_mutex);

      // msleep( rand_r(&seed) % 1000 + 1000 );  // Simulate a 1 to 2s load.

      pthread_mutex_lock(&stdout_mutex);
      printf("[%" PRIuPTR "]    Finished processing %u\n", worker_id, i);
      pthread_mutex_unlock(&stdout_mutex);
   }

   return NULL;
}

int main(void) {
   Queue_init(&q, QUEUE_SIZE);

   pthread_t workers[NUM_WORKERS];
   for (uintptr_t i=0; i<NUM_WORKERS; ++i) {
      if (errno = pthread_create(&(workers[i]), NULL, worker, (void*)i)) {
         perror(NULL);
         exit(1);
      }
   }

   for (unsigned i=0; i<NUM_ITEMS; ++i) {
      pthread_mutex_lock(&stdout_mutex);
      printf("[x] Enqueuing %u...\n", i);
      pthread_mutex_unlock(&stdout_mutex);

      Queue_enqueue(&q, i);

      pthread_mutex_lock(&stdout_mutex);
      printf("[x]    Enqueued %u.\n", i);
      pthread_mutex_unlock(&stdout_mutex);
   }

   Queue_done(&q);
   pthread_mutex_lock(&stdout_mutex);
   printf("[x] Called done.\n");
   pthread_mutex_unlock(&stdout_mutex);

   for (unsigned i=0; i<NUM_WORKERS; ++i)
      pthread_join(workers[i], NULL);

   Queue_destroy(&q);
   pthread_mutex_destroy(&stdout_mutex);
   return 0;
}

如果您对此有疑问,请随时发布该问题的链接,作为对此答案的评论。


建议运动的解决方案:

static void Queue_done(Queue *q) {
   pthread_mutex_lock(&(q->mutex));
   q->done = 1;
   pthread_cond_signal(&(q->cond));
   pthread_mutex_unlock(&(q->mutex));
}

// Returns the oldest item from the queue (via a parameter) and returns 1.
// If the queue is empty and done, returns 0.
// If the queue is empty and not done, waits until that changes.
static int Queue_dequeue(Queue *q, unsigned *i) {
   pthread_mutex_lock(&(q->mutex));
   while (q->empty && !q->done)
      pthread_cond_wait(&(q->cond), &(q->mutex));

   int dequeued;
   if (q->empty) {
      // We are completely done.
      dequeued = 0;
   } else {
      *i = q->buf[ q->next_read ];
      q->next_read = ( q->next_read + 1 ) % q->max;
      q->empty = q->next_read == q->next_insert;
      q->full = 0;
      dequeued = 1;
   }

   pthread_cond_signal(&(q->cond));
   pthread_mutex_unlock(&(q->mutex));
   return dequeued;
}

// Adds the argument to the queue.
// If the queue is full, waits until that changes.
static void Queue_enqueue(Queue *q, unsigned i) {
   pthread_mutex_lock(&(q->mutex));
   while (q->full && !q->done)
      pthread_cond_wait(&(q->cond), &(q->mutex));

   if (q->done) {
      fprintf(stderr, "Error: Attempted to add item to \"done\" queue.\n");
   } else {
      q->buf[q->next_insert] = i;
      q->next_insert = ( q->next_insert + 1 ) % q->max;
      q->empty = 0;
      q->full = q->next_insert == q->next_read;
   }

   pthread_cond_signal(&(q->cond));
   pthread_mutex_unlock(&(q->mutex));
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章