Pull to refresh

Параллельные вычисления по сети на Си

Programming *
Sandbox
Добрый день, хабражители! Недавно мне пришлось написать программу для параллельного вычисления определённого интеграла. Естественно, после окончания работы был приобретен некий опыт, и я хотел бы поделиться этим опытом с вами. Сразу скажу, что в рамках статьи я не буду рассматривать парсинг интегрируемой функции, а заострю внимание на взаимодействии тредов и компьютеров между собой.

Постановка задачи


Конечно, прежде чем начать писать код нужно правильно поставить себе задачу, требования к программе были следующие:
  • Вычисления должны производиться на нескольких машинах параллельно (в пределах локальной сети)
  • Вычисления должны быть многопоточными
  • Доступ к компьютерам, на которых производятся вычисления, непостоянный


То есть, необходимо написать программу которая бы запускалась на нескольких компьютерах (видимо на рабочих станциях) и ждала указания посчитать интеграл, затем считала его и отправляла результат обратно, тому кто просил посчитать.

Так как предстоит иметь дело с сетью, выберем вероятностный метод вычисления интеграла, будем использовать метод Монте-Карло. Он легко распараллеливается, очевидны два способа:
  • По отрезкам, то есть каждый вычислитель генерирует n точек на определённом отрезке
  • По количеству генерируемых точек, то есть каждый вычислитель генерирует n/(кол-во вычислителей) точек на всём отрезке интегрирования

Я выберу второй способ, ничем особо не мотивируя, просто не придётся передавать вычислителям начало и конец их отрезков.
Итак нам нужен сервер (вычислитель) который будет ждать задания, считать и возвращать результат. А так же нам нужен клиент, с которым будет взаимодействовать пользователь.
Как находить сервера для вычислений? Знать все IP наизусть или записывать их куда-либо не самый лучший вариант, если учесть, что IP адреса у серверов динамические, плюс они могут считать другую задачу в данный момент или просто быть не в сети. Решение простое, будем использовать широковещательный запрос для нахождения доступных на данный момент серверов.

Вторая проблема, какой транспорт использовать для обмена информацией? Для широковещательного запроса, очевидно будем использовать UDP. А вот для взаимодействия клиента и сервера можно использовать как UDP, так и TCP. Но с TCP будет меньше проблем, так как у нас не будет необходимости проверять состояние соединения. Если сокет закроется с другой стороны, то ОС сама определит это и даст нам знать.

Конечный вариант взаимодействия таков: сервер ждёт соединения на TCP сокете и одновременно с этим отвечает на широковещательные запросы, обозначая своё присутствие клиентам. Как только клиент подключился и дал задание — приостанавливаем ответы на широковещательные запросы, вычисляем, отвечаем клиенту, начинаем цикл заново. Клиент же: посылает запрос, формирует список серверов, разделяет между ними задачу, получает результаты и выводит их пользователю.

Сервер


Давайте сначала напишем сервер. Первым делом договоримся на каком порту мы будем ждать запросов от клиентов, пускай это будет порт — номер 38199. Затем объявим структуру для отправки клиентом задания серверу.
#define RCVPORT 38199
#define FUNC(x) x*x // В примере я буду вычислять интеграл от x^2
// Структура для передачи задания серверу
typedef struct {
  int limits;
  int numoftry;
} task_data_t;


* This source code was highlighted with Source Code Highlighter.


Как видно из кода выше, клиент будет посылать серверу верхний предел интегрирования и количество попыток которое он должен сделать.

Ясно, что вычисления мы будем делать в несколько потоков, поэтому создадим структуру аргумент и функцию для тредов вычислителей:
// Аргумент функции треда вычислителя
typedef struct {
 int limits; // Предел интегрирования
 long numoftry; // Количество попыток которые должен выполнить тред
 long double *results; // Куда записать результат
} thread_args_t;

// Функция треда вычислителя
void *calculate(void *arg) {
 // Ожидаем в качестве аргумента указатель на структуру thread_args_t
 thread_args_t *tinfo = (thread_args_t*) arg;
 long double result = 0;
 int xlim = tinfo->limits;
 int trys = tinfo->numoftry;
 unsigned a = xlim;
 for(int i = 0; i < trys; ++i) {
   int div = rand_r(&a);
    int div2 = rand_r(&a);
   double x = div % xlim + (div2/(div2*1.0 + div*1.0)) ;
   result += FUNC(x);
 }
 *(tinfo->results) = result;
  return NULL;
}


* This source code was highlighted with Source Code Highlighter.


Обращаю внимание на то, что используется функция rand_r(unsigned int *seedp). Поскольку эта функция использует локальную переменную для хранения промежуточного значения между вызовами. Мы не можем допустить использования глобальной для всех тредов переменной, как это будет в случае использования функции rand(), так как это вызовет их взаимную блокировку.

После запуска всех вычислительных тредов, главный тред повиснет на функции pthread_join() и ничего не сможет сделать, в случае если клиент во время вычислений «умрет», чтобы избежать пустых вычислений, запустим ещё один тред, который будет проверять состояние клиента.
// Аргумент для проверяющего клиента треда
typedef struct {
  int sock; // Сокет с клиентом
  pthread_t *calcthreads; // Треды которые в случае чего надо убить
  int threadnum; // Количество этих тредов
} checker_args_t;

// Функция которая будет выполнена тредом получившим сигнал SIGUSR1
void thread_cancel(int signo) {
  pthread_exit(PTHREAD_CANCELED);
}

// Тред проверяющий состояние клиента
void *client_check(void *arg) {
  // Нам должен быть передан аргумент типа checker_args_t
  checker_args_t *args = (checker_args_t*) arg;
  char a[10];
  recv(args->sock, &a, 10, 0); // Так как мы используем TCP, если клиент умрет или что либо
                         // скажет, то recv тут же разблокирует тред и вернёт -1
  int st;
  for(int i = 0; i < args->threadnum; ++i)
    st = pthread_kill(args->calcthreads[i], SIGUSR1); // Шлем всем вычислителям SIGUSR1
  return NULL;
}


* This source code was highlighted with Source Code Highlighter.


Для принудительного завершения вычисляющих тредов я решил, использовать сигналы, так как никакой памяти в результате их работы не выделяется, бояться нечего. Хотя использование функции pthread_cancel() и макросов pthread_cleanup_push() и pthread_cleanup_pop() было бы более правильным. Конечно, написана функция thread_cancel() которая будет выполняться при получении сигнала. И запомним, что в начале работы программы, перед запуском тредов, нужно установить правильную маску для обработки сигналов, иначе мы рискуем просто выйти из программы.

Теперь давайте напишем тред который будет отвечать на broadcast запросы. Чтобы главный тред мог спокойно повиснуть, ожидая клиента, а наш дополнительный тред, отвечал в этот момент на запросы.
void *listen_broadcast(void *arg) {
  int *isbusy = arg;
  // Создаем сокет для работы с broadcast
  int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
  if(sockbrcast == -1) {
    perror("Create broadcast socket failed");
    exit(EXIT_FAILURE);
  }
  
  // Создаем структуру для приема ответов на broadcast
  int port_rcv = RCVPORT;
  struct sockaddr_in addrbrcast_rcv;
  bzero(&addrbrcast_rcv, sizeof(addrbrcast_rcv));
  addrbrcast_rcv.sin_family = AF_INET;
  addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
  addrbrcast_rcv.sin_port = htons(port_rcv);
  // Биндим её
  if(bind(sockbrcast, (struct sockaddr *) &addrbrcast_rcv,
      sizeof(addrbrcast_rcv)) < 0) {
    perror("Bind broadcast socket failed");
    close(sockbrcast);
    exit(EXIT_FAILURE);
  }
  
  int msgsize = sizeof(char) * 18;
  char hellomesg[18];
  bzero(hellomesg, msgsize);
  // Делаем прослушивание сокета broadcast'ов неблокирующим
  fcntl(sockbrcast, F_SETFL, O_NONBLOCK);
  
  // Создаем множество прослушивания
  fd_set readset;
  FD_ZERO(&readset);
  FD_SET(sockbrcast, &readset);
  
  // Таймаут
  struct timeval timeout;
  timeout.tv_sec = 3;
  timeout.tv_usec = 0;
  
  struct sockaddr_in client;;
  bzero(&client, sizeof(client));
  socklen_t servaddrlen = sizeof(struct sockaddr_in);
  char helloanswer[18];
  bzero(helloanswer, msgsize);
  strcpy(helloanswer, "Hello Client");
  int sockst = 1;
  while(sockst > 0) {
    sockst = select(sockbrcast + 1, &readset, NULL, &readset, NULL);
    if(sockst == -1) {
      perror("Broblems on broadcast socket");
      exit(EXIT_FAILURE);
    }
    int rdbyte = recvfrom(sockbrcast, (void*) hellomesg, msgsize,MSG_TRUNC,
        (struct sockaddr*) &client,
        &servaddrlen);
    if(rdbyte == msgsize && strcmp(hellomesg, "Hello Integral") == 0 &&
        *isbusy == 0) {
      if(sendto(sockbrcast, helloanswer, msgsize, 0,
        (struct sockaddr*) &client, sizeof(struct sockaddr_in)) < 0) {
        perror("Sending answer");
        close(sockbrcast);
        exit(EXIT_FAILURE);
      }
    }
    FD_ZERO(&readset);
    FD_SET(sockbrcast, &readset);
  }
  return NULL;
}


* This source code was highlighted with Source Code Highlighter.


Тут всё просто, создаем сокет и ждем запросов. Как запрос получили — отвечаем на него. И одно усложнение, отвечать на запрос или нет, решаем по значению переменной isbusy.

Наконец добрались до main'а:
int main(int argc, char** argv) {
  // Аргумент может быть только один - это кол-во тредов
  if(argc > 2) {
    fprintf(stderr, "Usage: %s [numofcpus]\n", argv[0]);
    exit(EXIT_FAILURE);
  }
  
  int numofthread;
  
  if(argc == 2) {
    numofthread = atoi(argv[1]);
    if(numofthread < 1) {
      fprintf(stderr, "Incorrect num of threads!\n");
      exit(EXIT_FAILURE);
    }
    fprintf(stdout, "Num of threads forced to %d\n", numofthread);
  } else {
    // Если аргументов нет, то определяем кол-во процессоров автоматически
    numofthread = sysconf(_SC_NPROCESSORS_ONLN);
    if(numofthread < 1) {
      fprintf(stderr, "Can't detect num of processors\n"
          "Continue in two threads\n");
      numofthread = 2;
    }
    fprintf(stdout, "Num of threads detected automatically it's %d\n\n",
        numofthread);
  }


* This source code was highlighted with Source Code Highlighter.


Думаю проверку аргументов можно не объяснять…

Установим маску для сигналов, и запустим тред для прослушивания запросов:
  struct sigaction cancel_act;
  memset(&cancel_act, 0, sizeof(cancel_act));
  cancel_act.sa_handler = thread_cancel;
  sigfillset(&cancel_act.sa_mask);
  sigaction(SIGUSR1, &cancel_act, NULL);
  
  // Создаем тред слушающий broadcast'ы
  pthread_t broadcast_thread;
  int isbusy = 1;//(int*) malloc(sizeof(int));
  // Переменная которая сообщает треду следует ли отвечать на broadcast
  // 0 - отвечать, 1- нет
  isbusy = 1;
  if(pthread_create(&broadcast_thread, NULL, listen_broadcast, &isbusy)) {
    fprintf(stderr, "Can't create broadcast listen thread");
    perror("Detail:");
    exit(EXIT_FAILURE);
  }


* This source code was highlighted with Source Code Highlighter.


Теперь создаем сокет, с которым будут устанавливать соединение клиенты:
  int listener;
  struct sockaddr_in addr;
  listener = socket(PF_INET, SOCK_STREAM, 0);
  if(listener < 0) {
    perror("Can't create listen socket");
    exit(EXIT_FAILURE);
  }

  addr.sin_family = AF_INET;
  addr.sin_port = htons(RCVPORT);
  addr.sin_addr.s_addr = INADDR_ANY;
  int a = 1;
  // Добавляем опцию SO_REUSEADDR для случаев когда мы перезапускам сервер
  if(setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &a, sizeof(a)) < 0) {
    perror("Set listener socket options");
    exit(EXIT_FAILURE);
  }
  
  // Биндим сокет
  if(bind(listener, (struct sockaddr*) &addr, sizeof(addr)) < 0) {
    perror("Can't bind listen socket");
    exit(EXIT_FAILURE);
  }
  
  // Начинаем ждать соединения от клиентов
  if(listen(listener, 1) < 0) {
    perror("Eror listen socket");
    exit(EXIT_FAILURE);
  }


* This source code was highlighted with Source Code Highlighter.


Начинаем вычислительный цикл:
  // Ожидаем соединений
  int needexit = 0;
  while(needexit == 0) {
    fprintf(stdout, "\nWait new connection...\n\n");
    int client;
    isbusy = 0; // Разрешаем отвечать клиентам на запросы
    struct sockaddr_in addrclient;
    socklen_t addrclientsize = sizeof(addrclient);
    client = accept(listener, (struct sockaddr*)&addrclient,
        &addrclientsize);
     if(client < 0) {
      perror("Client accepting");
    }


* This source code was highlighted with Source Code Highlighter.


Мы спокойно повисаем на accept'е, так как ответами на широковещательные запросы занимается отдельный тред.
После того как клиент подключился к нам, проверяем данные от него, и начинаем вычислять:
    isbusy = 1; // Запрещаем отвечать на запросы
    task_data_t data;
    int read_bytes = recv(client, &data, sizeof(data), 0);
    if(read_bytes != sizeof(data) || data.limits < 1 || data.numoftry < 1) {
      fprintf(stderr, "Invalid data from %s on port %d, reset peer\n",
          inet_ntoa(addrclient.sin_addr), ntohs(addrclient.sin_port));
      close(client);
      isbusy = 0;
    } else {
      fprintf(stdout, "New task from %s on port %d\nlimits: %d\n"
          "numoftrys: %d\n", inet_ntoa(addrclient.sin_addr),
          ntohs(addrclient.sin_port), data.limits, data.numoftry);
      thread_args_t *tinfo;
      pthread_t *calc_threads =
          (pthread_t*) malloc(sizeof(pthread_t) * numofthread);
      int threads_trys = data.numoftry % numofthread;
      long double *results =
        (long double *) malloc(sizeof(long double) * numofthread);
      tinfo = (thread_args_t*) malloc(sizeof(thread_args_t) *
          numofthread);
      // Создаем вычислительные треды
      int numofthreadtry = data.numoftry / numofthread + 1;
      for(int i = 0; i < numofthread; ++i) {
        tinfo[i].limits = data.limits;
        tinfo[i].numoftry = numofthreadtry;
        tinfo[i].results = &results[i];
        if(pthread_create(&calc_threads[i], NULL, calculate, &tinfo[i])
            != 0) {
          fprintf(stderr, "Can't create thread by num %d", i);
          perror("Detail:");
          exit(EXIT_FAILURE);
        }
      }
      
      // Создаем тред проверяющий соединение с клиентом
      checker_args_t checker_arg;
      checker_arg.calcthreads = calc_threads;
      checker_arg.threadnum = numofthread;
      checker_arg.sock = client;
      pthread_t checker_thread;
      if(pthread_create(&checker_thread, NULL, client_check,
        &checker_arg) != 0) {
        fprintf(stderr, "Can't create checker thread");
        perror("Detail:");
        exit(EXIT_FAILURE);
      }
      int iscanceled = 0; // Почему завершились треды?
      int *exitstat;
      for(int i = 0; i < numofthread; ++i) {
        pthread_join(calc_threads[i], (void*) &exitstat);
        if(exitstat == PTHREAD_CANCELED)
          iscanceled = 1; // Отменили их
      }
      if(iscanceled != 1) {
        long double *res = (long double*) malloc(sizeof(long double));
        bzero(res, sizeof(long double));
        *res = 0.0;
        for(int i = 0; i < numofthread; ++i)
          *res += results[i];
        pthread_kill(checker_thread, SIGUSR1);
        if(send(client, res, sizeof(long double), 0) < 0) {
          perror("Sending error");
        }
        close(client);
        free(res);
        //free(checker_arg);
        free(results);
        free(calc_threads);
        free(tinfo);
        isbusy = 0;
        fprintf(stdout, "Calculate and send finish!\n");
      } else {
        fprintf(stderr, "Client die!\n");
        close(client);
        //free(checker_arg);
        free(results);
        free(calc_threads);
        free(tinfo);
      }

      
    }
    
  }
  
  return (EXIT_SUCCESS);
}


* This source code was highlighted with Source Code Highlighter.


Оставшаяся часть кода проста:
  1. Запускаем вычислительные треды
  2. Запускаем тред проверяющий состояние
  3. Ждем пока всё посчитается
  4. Отвечаем клиенту
  5. Начинаем вычислительный цикл заново

Обращаю ваше внимание на то, что завершать проверяющий тред не нужно, ведь он сам завершается, когда закрывается сокет с клиентом.

Все сервер готов.

Клиент


Клиент будет устроен следующим образом:
  1. Формируем список серверов
  2. Для работы с каждым сервером создаем тред
  3. Ждем результатов и выходим

Для работы с каждым сервером удобно создать отдельный тред, это позволит работать с ними асинхронно.

Так же как и в сервере объявим структуру для обмена данными. А также структуру аргумент для тредов работающих с серверами.
// Структура для отправки задания серверу
typedef struct {
  int limits;
  int numoftry;
} task_data_t;

// Аргумент для треда работающего с сервером
typedef struct {
  int limits; // Пределы
  int numoftry; // Количество попыток для сервера
  struct sockaddr_in *server; // Структура с информацией для подключения к серверу
  long double *results; // Куда записать результат
} thread_args_t;


* This source code was highlighted with Source Code Highlighter.


В клиенте нем не нужно задавать порт для прослушивания, потому что сервер отвечает туда откуда пришел пакет, поэтому возьмем тот, который нам назначит ОС. Итак, функция (тред) работающая с сервером:
// Функция треда работающего с сервером
void *send_thread(void *arg) {
  thread_args_t *task_data = (thread_args_t*) arg;
  int servsock = socket(PF_INET, SOCK_STREAM, 0);
  if(servsock < 0) {
    perror("Create new socket to server");
    exit(EXIT_FAILURE);
  }
  struct sockaddr_in listenaddr;
  listenaddr.sin_family = AF_INET;
  listenaddr.sin_addr.s_addr = INADDR_ANY;
  listenaddr.sin_port = 0;
  
  if(bind(servsock, (struct sockaddr*) &listenaddr, sizeof(listenaddr)) < 0) {
    perror("Can't create listen socket");
    exit(EXIT_FAILURE);
  }
  socklen_t servaddrlen = sizeof(struct sockaddr_in);
  if(connect(servsock, (struct sockaddr*)task_data->server,
    servaddrlen) < 0) {
    perror("Connect to server failed!");
    exit(EXIT_FAILURE);
  }
  task_data_t senddata;
  senddata.limits = task_data->limits;
  senddata.numoftry = task_data->numoftry;
  
  if(send(servsock, &senddata, sizeof(senddata), 0) < 0) {
    perror("Sending data to server failed");
    exit(EXIT_FAILURE);
  }
  
  int recv_byte = recv(servsock, task_data->results, sizeof(long double), 0);
  if(recv_byte == 0) {
    fprintf(stderr, "Server %s on port %d die!\nCancel calculate, on all",
        inet_ntoa(task_data->server->sin_addr),
        ntohs(task_data->server->sin_port));
    exit(EXIT_FAILURE);
  }
  fprintf(stdout, "Server %s on port %d finish!\n",
        inet_ntoa(task_data->server->sin_addr),
        ntohs(task_data->server->sin_port));
  return NULL;
}


* This source code was highlighted with Source Code Highlighter.


main очень похож на main сервера, плюс достаточно подробно откомментирован, я не буду особо его обсуждать.
int main(int argc, char** argv) {
  if(argc < 3) {
    fprintf(stderr, "Usage: %s limits numoftry [maxserv]\n", argv[0]);
    exit(EXIT_FAILURE);
  }
  
  int numoftry = atoi(argv[2]);
  if(numoftry == 0) {
    fprintf(stderr, "Num of try is invalid\n");
    exit(EXIT_FAILURE);
  }
  int maxservu = 1000000;
  if(argc == 4) {
   maxservu = atoi(argv[3]);
   if (maxservu < 1) {
    fprintf(stderr, "Error number of max servers\n");
    exit(EXIT_FAILURE);
   }
  }
  int limits = atoi(argv[1]);
  if(limits == 0) {
    fprintf(stderr, "Limits is invalid\n");
    exit(EXIT_FAILURE);
  }
  
  // Создаем сокет для работы с broadcast
  int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
  if(sockbrcast == -1) {
    perror("Create broadcast socket failed");
    exit(EXIT_FAILURE);
  }
  
  
  // Создаем структуру для приема ответов на broadcast
  int port_rcv = 0;
  struct sockaddr_in addrbrcast_rcv;
  bzero(&addrbrcast_rcv, sizeof(addrbrcast_rcv));
  addrbrcast_rcv.sin_family = AF_INET;
  addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
  addrbrcast_rcv.sin_port = 0;//htons(port_rcv);
  // Биндим её
  if(bind(sockbrcast, (struct sockaddr *) &addrbrcast_rcv,
      sizeof(addrbrcast_rcv)) < 0) {
    perror("Bind broadcast socket failed");
    close(sockbrcast);
    exit(EXIT_FAILURE);
  }
  
  // Структура для отправки broadcast
  int port_snd = 38199;
  struct sockaddr_in addrbrcast_snd;
  bzero(&addrbrcast_snd, sizeof(addrbrcast_snd));
  addrbrcast_snd.sin_family = AF_INET;
  addrbrcast_snd.sin_port = htons(port_snd);
  addrbrcast_snd.sin_addr.s_addr = htonl(0xffffffff);
  
  // Разрешаем broadcast на сокете
  int access = 1;
  if(setsockopt(sockbrcast, SOL_SOCKET, SO_BROADCAST,
       (const void*) &access, sizeof(access)) < 0) {
    perror("Can't accept broadcast option at socket to send");
    close(sockbrcast);
    exit(EXIT_FAILURE);
  }
  int msgsize = sizeof(char) * 18;
  void *hellomesg = malloc(msgsize);
  bzero(hellomesg, msgsize);
  strcpy(hellomesg, "Hello Integral");
  // Посылаем broadcast
  if(sendto(sockbrcast, hellomesg, msgsize, 0,
      (struct sockaddr*) &addrbrcast_snd, sizeof(addrbrcast_snd)) < 0) {
    perror("Sending broadcast");
    close(sockbrcast);
    exit(EXIT_FAILURE);
  }
  
  // Делаем прослушивание сокета broadcast'ов неблокирующим
  fcntl(sockbrcast, F_SETFL, O_NONBLOCK);
  
  // Создаем множество прослушивания
  fd_set readset;
  FD_ZERO(&readset);
  FD_SET(sockbrcast, &readset);
  
  // Таймаут
  struct timeval timeout;
  timeout.tv_sec = 3;
  timeout.tv_usec = 0;
  
  struct sockaddr_in *servers =
    (struct sockaddr_in*) malloc(sizeof(struct sockaddr_in));
  bzero(servers, sizeof(struct sockaddr_in));
  int servcount = 0;
  int maxserv = 1;
  socklen_t servaddrlen = sizeof(struct sockaddr_in);
  // Создаем список серверов servers
  while(select(sockbrcast + 1, &readset, NULL, &readset, &timeout) > 0) {
    int rdbyte = recvfrom(sockbrcast, (void*) hellomesg, msgsize,MSG_TRUNC,
        (struct sockaddr*) &servers[servcount],
        &servaddrlen);
    if(rdbyte == msgsize && strcmp(hellomesg, "Hello Client") == 0) {
      servcount++;
      
      if(servcount >= maxserv) {
        servers = realloc(servers,
            sizeof(struct sockaddr_in) * (maxserv + 1));
        if(servers == NULL) {
          perror("Realloc failed");
          close(sockbrcast);
          exit(EXIT_FAILURE);
        }
        bzero(&servers[servcount], servaddrlen);
        maxserv++;
      }
      FD_ZERO(&readset);
      FD_SET(sockbrcast, &readset);
    }
  }
  int i;
  if(servcount < 1) {
    fprintf(stderr, "No servers found!\n");
    exit(EXIT_FAILURE);
  }
  if(argc > 3 && maxservu <= servcount)
   servcount = maxservu;
  for(i = 0; i < servcount; ++i) {
    printf("Server answer from %s on port %d\n",
        inet_ntoa(servers[i].sin_addr), ntohs(servers[i].sin_port));
  }
  printf("\n");
  free(hellomesg);
  
  long double *results =
    (long double*) malloc(sizeof(long double) * servcount);
  // Создаем треды для работы с серверами
  pthread_t *tid = (pthread_t*) malloc(sizeof(pthread_t) * servcount);
  for(i = 0; i < servcount; ++i) {
    thread_args_t *args = (thread_args_t*) malloc (sizeof(thread_args_t));
    args->limits = limits;
    args->numoftry = numoftry / servcount + 1;
    args->results = &results[i];
    args->server = &servers[i];
    
    if(pthread_create(&tid[i], NULL, send_thread, args) != 0) {
      perror("Create send thread failed");
      exit(EXIT_FAILURE);
    }
  }
  long double res = 0;
  // Ждем все сервера
  for(i = 0; i < servcount; ++i)
    pthread_join(tid[i], NULL);
  
  // Вычисляем результат
  for(i = 0; i < servcount; ++i)
    res += results[i];
  res /= numoftry;
  res *= limits;

  
  free(servers);
  printf("\nResult: %Lf\n", res);
  return (EXIT_SUCCESS);
}


* This source code was highlighted with Source Code Highlighter.


Тестирование производительности


Производительность в зависимости от количества тредов

Я запускал программу на одном компьютере, с одними и теми же входными данными, с разным количеством тредов.
Клиент запускаем с параметрами 2 1000000000; Сервер с параметрами 1, 2, 4, 8.
Результаты соответственно: 0m37.063s, 0m20.576s, 0m20.329s, 0m21.029s. Следует учитывать, что от результатов которые показал time нужно отнимать 4 секунды которые мы дожидаемся серверов.
Процессор на машине Core(TM)2 Duo CPU T5470. Что и ожидалось, у процессора два ядра а соответственно запускать больше чем в 2 потока бессмысленно, и так: удвоение числа потоков даёт
ускорение в два раза.

Так же проводилось тестирование на Asus 1215p, результат имеет ту же зависимость от количества тредов, что и выше

Производительность в зависимости от количества машин

Теперь я запускал сервер на 1, 2, 4, 8, 16 компьютерах, количество тредов равно количеству ядер, то есть два треда.
Вот результаты: 0m10.268s, 0m5.122s, 0m2.487s, 0m1.265, 0m0.766s.
Опять получаем ожидаемый результат, с каждым новым компьютером вычисления ускорятются в 2 раза.

Итого


В итоге мы получаем программу, которая параллельно вычисляет интеграл и устойчива к разного рода неприятностям, которые могут произойти во время работы. К сожалению из-за большого размера статьи не получилось рассмотреть всё несколько подробнее. Но я надеюсь что мой пример поможет новичкам избежать некоторых ошибок, которые возникали у меня. Исходники программ с make файлами можно взять тут.

Спасибо за внимание!
Tags: cthreadslinuxпараллельное программирование
Hubs: Programming
Total votes 63: ↑58 and ↓5 +53
Comments 17
Comments Comments 17

Popular right now