SCOREC core
Parallel unstructured mesh tools
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
pcu.c
Go to the documentation of this file.
1 /******************************************************************************
2 
3  Copyright 2011 Scientific Computation Research Center,
4  Rensselaer Polytechnic Institute. All rights reserved.
5 
6  This work is open source software, licensed under the terms of the
7  BSD license as described in the LICENSE file in the top-level directory.
8 
9 *******************************************************************************/
32 #include <string.h>
33 #include <stdarg.h>
34 #include "PCU.h"
35 #include "pcu_msg.h"
36 #include "pcu_pmpi.h"
37 #include "pcu_order.h"
38 #include "noto_malloc.h"
39 #include "reel.h"
40 #include <sys/types.h> /*required for mode_t for mkdir on some systems*/
41 #include <sys/stat.h> /*using POSIX mkdir call for SMB "foo/" path*/
42 #include <errno.h> /* for checking the error from mkdir */
43 
44 enum state { uninit, init };
45 static enum state global_state = uninit;
46 static pcu_msg global_pmsg;
47 
48 static pcu_msg* get_msg()
49 {
50  return &global_pmsg;
51 }
52 
58 int PCU_Comm_Init(void)
59 {
60  if (global_state != uninit)
61  reel_fail("nested calls to Comm_Init");
62  pcu_pmpi_init(MPI_COMM_WORLD);
63  pcu_set_mpi(&pcu_pmpi);
64  pcu_make_msg(&global_pmsg);
65  global_state = init;
66  /* turn ordering on by default, call
67  PCU_Comm_Order(false) after PCU_Comm_Init
68  to disable this */
69  PCU_Comm_Order(true);
70  return PCU_SUCCESS;
71 }
72 
77 int PCU_Comm_Free(void)
78 {
79  if (global_state == uninit)
80  reel_fail("Comm_Free called before Comm_Init");
81  if (global_pmsg.order)
82  pcu_order_free(global_pmsg.order);
83  pcu_free_msg(&global_pmsg);
84  pcu_pmpi_finalize();
85  global_state = uninit;
86  return PCU_SUCCESS;
87 }
88 
98 int PCU_Comm_Self(void)
99 {
100  if (global_state == uninit)
101  reel_fail("Comm_Self called before Comm_Init");
102  return pcu_mpi_rank();
103 }
104 
109 int PCU_Comm_Peers(void)
110 {
111  if (global_state == uninit)
112  reel_fail("Comm_Peers called before Comm_Init");
113  return pcu_mpi_size();
114 }
115 
122 void PCU_Comm_Begin(void)
123 {
124  if (global_state == uninit)
125  reel_fail("Comm_Begin called before Comm_Init");
126  pcu_msg_start(get_msg());
127 }
128 
135 int PCU_Comm_Pack(int to_rank, const void* data, size_t size)
136 {
137  if (global_state == uninit)
138  reel_fail("Comm_Pack called before Comm_Init");
139  if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
140  reel_fail("Invalid rank in Comm_Pack");
141  memcpy(pcu_msg_pack(get_msg(),to_rank,size),data,size);
142  return PCU_SUCCESS;
143 }
144 
152 int PCU_Comm_Send(void)
153 {
154  if (global_state == uninit)
155  reel_fail("Comm_Send called before Comm_Init");
156  pcu_msg_send(get_msg());
157  return PCU_SUCCESS;
158 }
159 
171 bool PCU_Comm_Listen(void)
172 {
173  if (global_state == uninit)
174  reel_fail("Comm_Listen called before Comm_Init");
175  pcu_msg* m = get_msg();
176  if (m->order)
177  return pcu_order_receive(m->order, m);
178  return pcu_msg_receive(m);
179 }
180 
185 {
186  if (global_state == uninit)
187  reel_fail("Comm_Sender called before Comm_Init");
188  pcu_msg* m = get_msg();
189  if (m->order)
190  return pcu_order_received_from(m->order);
191  return pcu_msg_received_from(m);
192 }
193 
198 {
199  if (global_state == uninit)
200  reel_fail("Comm_Unpacked called before Comm_Init");
201  pcu_msg* m = get_msg();
202  if (m->order)
203  return pcu_order_unpacked(m->order);
204  return pcu_msg_unpacked(m);
205 }
206 
217 int PCU_Comm_Unpack(void* data, size_t size)
218 {
219  if (global_state == uninit)
220  reel_fail("Comm_Unpack called before Comm_Init");
221  pcu_msg* m = get_msg();
222  if (m->order)
223  memcpy(data,pcu_order_unpack(m->order,size),size);
224  else
225  memcpy(data,pcu_msg_unpack(m,size),size);
226  return PCU_SUCCESS;
227 }
228 
229 void PCU_Comm_Order(bool on)
230 {
231  if (global_state == uninit)
232  reel_fail("Comm_Order called before Comm_Init");
233  pcu_msg* m = get_msg();
234  if (on && (!m->order))
235  m->order = pcu_order_new();
236  if ((!on) && m->order) {
237  pcu_order_free(m->order);
238  m->order = NULL;
239  }
240 }
241 
243 void PCU_Barrier(void)
244 {
245  if (global_state == uninit)
246  reel_fail("Barrier called before Comm_Init");
247  pcu_barrier(&(get_msg()->coll));
248 }
249 
256 void PCU_Add_Doubles(double* p, size_t n)
257 {
258  if (global_state == uninit)
259  reel_fail("Add_Doubles called before Comm_Init");
260  pcu_allreduce(&(get_msg()->coll),pcu_add_doubles,p,n*sizeof(double));
261 }
262 
263 double PCU_Add_Double(double x)
264 {
265  double a[1];
266  a[0] = x;
267  PCU_Add_Doubles(a, 1);
268  return a[0];
269 }
270 
273 void PCU_Min_Doubles(double* p, size_t n)
274 {
275  if (global_state == uninit)
276  reel_fail("Min_Doubles called before Comm_Init");
277  pcu_allreduce(&(get_msg()->coll),pcu_min_doubles,p,n*sizeof(double));
278 }
279 
280 double PCU_Min_Double(double x)
281 {
282  double a[1];
283  a[0] = x;
284  PCU_Min_Doubles(a, 1);
285  return a[0];
286 }
287 
290 void PCU_Max_Doubles(double* p, size_t n)
291 {
292  if (global_state == uninit)
293  reel_fail("Max_Doubles called before Comm_Init");
294  pcu_allreduce(&(get_msg()->coll),pcu_max_doubles,p,n*sizeof(double));
295 }
296 
297 double PCU_Max_Double(double x)
298 {
299  double a[1];
300  a[0] = x;
301  PCU_Max_Doubles(a, 1);
302  return a[0];
303 }
304 
307 void PCU_Add_Ints(int* p, size_t n)
308 {
309  if (global_state == uninit)
310  reel_fail("Add_Ints called before Comm_Init");
311  pcu_allreduce(&(get_msg()->coll),pcu_add_ints,p,n*sizeof(int));
312 }
313 
314 int PCU_Add_Int(int x)
315 {
316  int a[1];
317  a[0] = x;
318  PCU_Add_Ints(a, 1);
319  return a[0];
320 }
321 
324 void PCU_Add_Longs(long* p, size_t n)
325 {
326  if (global_state == uninit)
327  reel_fail("Add_Longs called before Comm_Init");
328  pcu_allreduce(&(get_msg()->coll),pcu_add_longs,p,n*sizeof(long));
329 }
330 
331 long PCU_Add_Long(long x)
332 {
333  long a[1];
334  a[0] = x;
335  PCU_Add_Longs(a, 1);
336  return a[0];
337 }
338 
341 void PCU_Add_SizeTs(size_t* p, size_t n)
342 {
343  if (global_state == uninit)
344  reel_fail("Add_SizeTs called before Comm_Init");
345  pcu_allreduce(&(get_msg()->coll),pcu_add_sizets,p,n*sizeof(size_t));
346 }
347 
348 size_t PCU_Add_SizeT(size_t x)
349 {
350  size_t a[1];
351  a[0] = x;
352  PCU_Add_SizeTs(a, 1);
353  return a[0];
354 }
355 
358 void PCU_Min_SizeTs(size_t* p, size_t n) {
359  if (global_state == uninit)
360  reel_fail("Min_SizeTs called before Comm_Init");
361  pcu_allreduce(&(get_msg()->coll),pcu_min_sizets,p,n*sizeof(size_t));
362 }
363 
364 size_t PCU_Min_SizeT(size_t x) {
365  size_t a[1];
366  a[0] = x;
367  PCU_Min_SizeTs(a, 1);
368  return a[0];
369 }
370 
373 void PCU_Max_SizeTs(size_t* p, size_t n) {
374  if (global_state == uninit)
375  reel_fail("Max_SizeTs called before Comm_Init");
376  pcu_allreduce(&(get_msg()->coll),pcu_max_sizets,p,n*sizeof(size_t));
377 }
378 
379 size_t PCU_Max_SizeT(size_t x) {
380  size_t a[1];
381  a[0] = x;
382  PCU_Max_SizeTs(a, 1);
383  return a[0];
384 }
385 
392 void PCU_Exscan_Ints(int* p, size_t n)
393 {
394  if (global_state == uninit)
395  reel_fail("Exscan_Ints called before Comm_Init");
396  int* originals;
397  NOTO_MALLOC(originals,n);
398  for (size_t i=0; i < n; ++i)
399  originals[i] = p[i];
400  pcu_scan(&(get_msg()->coll),pcu_add_ints,p,n*sizeof(int));
401  //convert inclusive scan to exclusive
402  for (size_t i=0; i < n; ++i)
403  p[i] -= originals[i];
404  noto_free(originals);
405 }
406 
407 int PCU_Exscan_Int(int x)
408 {
409  int a[1];
410  a[0] = x;
411  PCU_Exscan_Ints(a, 1);
412  return a[0];
413 }
414 
416 void PCU_Exscan_Longs(long* p, size_t n)
417 {
418  if (global_state == uninit)
419  reel_fail("Exscan_Longs called before Comm_Init");
420  long* originals;
421  NOTO_MALLOC(originals,n);
422  for (size_t i=0; i < n; ++i)
423  originals[i] = p[i];
424  pcu_scan(&(get_msg()->coll),pcu_add_longs,p,n*sizeof(long));
425  //convert inclusive scan to exclusive
426  for (size_t i=0; i < n; ++i)
427  p[i] -= originals[i];
428  noto_free(originals);
429 }
430 
431 long PCU_Exscan_Long(long x)
432 {
433  long a[1];
434  a[0] = x;
435  PCU_Exscan_Longs(a, 1);
436  return a[0];
437 }
438 
441 void PCU_Min_Ints(int* p, size_t n)
442 {
443  if (global_state == uninit)
444  reel_fail("Min_Ints called before Comm_Init");
445  pcu_allreduce(&(get_msg()->coll),pcu_min_ints,p,n*sizeof(int));
446 }
447 
448 int PCU_Min_Int(int x)
449 {
450  int a[1];
451  a[0] = x;
452  PCU_Min_Ints(a, 1);
453  return a[0];
454 }
455 
458 void PCU_Max_Ints(int* p, size_t n)
459 {
460  if (global_state == uninit)
461  reel_fail("Max_Ints called before Comm_Init");
462  pcu_allreduce(&(get_msg()->coll),pcu_max_ints,p,n*sizeof(int));
463 }
464 
465 int PCU_Max_Int(int x)
466 {
467  int a[1];
468  a[0] = x;
469  PCU_Max_Ints(a, 1);
470  return a[0];
471 }
472 
475 int PCU_Or(int c)
476 {
477  return PCU_Max_Int(c);
478 }
479 
482 int PCU_And(int c)
483 {
484  return PCU_Min_Int(c);
485 }
486 
489 int PCU_Proc_Self(void)
490 {
491  if (global_state == uninit)
492  reel_fail("Proc_Self called before Comm_Init");
493  return pcu_pmpi_rank();
494 }
495 
498 int PCU_Proc_Peers(void)
499 {
500  if (global_state == uninit)
501  reel_fail("Proc_Peers called before Comm_Init");
502  return pcu_pmpi_size();
503 }
504 
507 int PCU_Comm_Rank(int* rank)
508 {
509  if (global_state == uninit)
510  reel_fail("Comm_Rank called before Comm_Init");
511  *rank = pcu_mpi_rank();
512  return PCU_SUCCESS;
513 }
514 
516 int PCU_Comm_Size(int* size)
517 {
518  if (global_state == uninit)
519  reel_fail("Comm_Size called before Comm_Init");
520  *size = pcu_mpi_size();
521  return PCU_SUCCESS;
522 }
523 
526 {
527  return global_state == init;
528 }
529 
532 int PCU_Comm_Start(PCU_Method method)
533 {
534  (void)method; //warning silencer
535  if (global_state == uninit)
536  reel_fail("Comm_Start called before Comm_Init");
537  pcu_msg_start(get_msg());
538  return PCU_SUCCESS;
539 }
540 
546 int PCU_Comm_Packed(int to_rank, size_t* size)
547 {
548  if (global_state == uninit)
549  reel_fail("Comm_Packed called before Comm_Init");
550  if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
551  reel_fail("Invalid rank in Comm_Packed");
552  *size = pcu_msg_packed(get_msg(),to_rank);
553  return PCU_SUCCESS;
554 }
555 
565 int PCU_Comm_Write(int to_rank, const void* data, size_t size)
566 {
567  if (global_state == uninit)
568  reel_fail("Comm_Write called before Comm_Init");
569  if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
570  reel_fail("Invalid rank in Comm_Write");
571  pcu_msg* msg = get_msg();
572  PCU_MSG_PACK(msg,to_rank,size);
573  memcpy(pcu_msg_pack(msg,to_rank,size),data,size);
574  return PCU_SUCCESS;
575 }
576 
579 {
580  while (PCU_Comm_Unpacked())
581  if (!PCU_Comm_Listen())
582  return false;
583  return true;
584 }
585 
598 bool PCU_Comm_Read(int* from_rank, void** data, size_t* size)
599 {
600  if (global_state == uninit)
601  reel_fail("Comm_Read called before Comm_Init");
602  if (!PCU_Comm_Receive())
603  return false;
604  *from_rank = PCU_Comm_Sender();
605  PCU_COMM_UNPACK(*size);
606  *data = PCU_Comm_Extract(*size);
607  return true;
608 }
609 
610 static void safe_mkdir(const char* path, mode_t mode)
611 {
612  int err;
613  errno = 0;
614  err = mkdir(path, mode);
615  if (err != 0 && errno != EEXIST)
616  reel_fail("PCU: could not create directory \"%s\"\n", path);
617 }
618 
619 static void append(char* s, size_t size, const char* format, ...)
620 {
621  int len = strlen(s);
622  va_list ap;
623  va_start(ap, format);
624  vsnprintf(s + len, size - len, format, ap);
625  va_end(ap);
626 }
627 
628 
629 void PCU_Debug_Open(void)
630 {
631  if (global_state == uninit)
632  reel_fail("Debug_Open called before Comm_Init");
633 
634  const int fanout = 2048;
635  const int bufsize = 1024;
636  char* path = noto_malloc(bufsize);
637  path[0] = '\0';
638  if (PCU_Comm_Peers() > fanout) {
639  mode_t const dir_perm = S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH;
640  strcpy(path, "debug/");
641  safe_mkdir(path, dir_perm);
642  int self = PCU_Comm_Self();
643  append(path, bufsize, "%d/", self / fanout);
644  if (self % fanout == 0)
645  safe_mkdir(path, dir_perm);
646  PCU_Barrier();
647  }
648 
649  append(path,bufsize, "%s", "debug");
650  pcu_msg* msg = get_msg();
651  if ( ! msg->file)
652  msg->file = pcu_open_parallel(path,"txt");
653  noto_free(path);
654 }
655 
657 void PCU_Debug_Print(const char* format, ...)
658 {
659  if (global_state == uninit)
660  reel_fail("Debug_Print called before Comm_Init");
661  pcu_msg* msg = get_msg();
662  if ( ! msg->file)
663  return; //Print is a no-op if no file is open
664  va_list ap;
665  va_start(ap,format);
666  vfprintf(msg->file,format,ap);
667  va_end(ap);
668  fflush(msg->file);
669 }
670 
672 int PCU_Comm_From(int* from_rank)
673 {
674  if (global_state == uninit)
675  reel_fail("Comm_From called before Comm_Init");
676  pcu_msg* m = get_msg();
677  if (m->order)
678  *from_rank = pcu_order_received_from(m->order);
679  else
680  *from_rank = pcu_msg_received_from(m);
681  return PCU_SUCCESS;
682 }
683 
689 int PCU_Comm_Received(size_t* size)
690 {
691  if (global_state == uninit)
692  reel_fail("Comm_Received called before Comm_Init");
693  pcu_msg* m = get_msg();
694  if (m->order)
695  *size = pcu_order_received_size(m->order);
696  else
697  *size = pcu_msg_received_size(m);
698  return PCU_SUCCESS;
699 }
700 
707 void* PCU_Comm_Extract(size_t size)
708 {
709  if (global_state == uninit)
710  reel_fail("Comm_Extract called before Comm_Init");
711  pcu_msg* m = get_msg();
712  if (m->order)
713  return pcu_order_unpack(m->order,size);
714  return pcu_msg_unpack(m,size);
715 }
716 
724 void PCU_Switch_Comm(MPI_Comm new_comm)
725 {
726  if (global_state == uninit)
727  reel_fail("Switch_Comm called before Comm_Init");
728  pcu_pmpi_switch(new_comm);
729 }
730 
736 MPI_Comm PCU_Get_Comm(void)
737 {
738  if (global_state == uninit)
739  reel_fail("Get_Comm called before Comm_Init");
740  return pcu_pmpi_comm();
741 }
742 
745 double PCU_Time(void)
746 {
747  return MPI_Wtime();
748 }
749 
750 void PCU_Protect(void)
751 {
752  reel_protect();
753 }
bool PCU_Comm_Receive(void)
Convenience wrapper over Listen and Unpacked.
Definition: pcu.c:578
void PCU_Add_SizeTs(size_t *p, size_t n)
Performs an Allreduce sum of size_t unsigned integers.
Definition: pcu.c:341
int PCU_Comm_Write(int to_rank, const void *data, size_t size)
Packs a message to be sent to to_rank.
Definition: pcu.c:565
int PCU_Comm_Rank(int *rank)
Similar to PCU_Comm_Self, returns the rank as an argument.
Definition: pcu.c:507
bool PCU_Comm_Initialized(void)
Returns true iff PCU has been initialized.
Definition: pcu.c:525
void PCU_Exscan_Longs(long *p, size_t n)
See PCU_Exscan_Ints.
Definition: pcu.c:416
void PCU_Max_SizeTs(size_t *p, size_t n)
Performs an Allreduce maximum of size_t unsigned integers.
Definition: pcu.c:373
void PCU_Max_Ints(int *p, size_t n)
Performs an Allreduce maximum of int arrays.
Definition: pcu.c:458
void * PCU_Comm_Extract(size_t size)
Extracts a block of data from the current received buffer.
Definition: pcu.c:707
int PCU_Comm_Peers(void)
Returns the number of threads in the program.
Definition: pcu.c:109
void PCU_Debug_Print(const char *format,...)
like fprintf, contents go to debugN.txt
Definition: pcu.c:657
void PCU_Min_SizeTs(size_t *p, size_t n)
Performs an Allreduce minimum of size_t unsigned integers.
Definition: pcu.c:358
void PCU_Min_Doubles(double *p, size_t n)
Performs an Allreduce minimum of double arrays.
Definition: pcu.c:273
double PCU_Time(void)
Return the time in seconds since some time in the past.
Definition: pcu.c:745
void PCU_Switch_Comm(MPI_Comm new_comm)
Reinitializes PCU with a new MPI communicator.
Definition: pcu.c:724
int PCU_Comm_Unpack(void *data, size_t size)
Unpacks a block of data from the current received buffer.
Definition: pcu.c:217
int PCU_Comm_Start(PCU_Method method)
Deprecated, see PCU_Comm_Begin.
Definition: pcu.c:532
int PCU_Comm_Pack(int to_rank, const void *data, size_t size)
Packs data to be sent to to_rank.
Definition: pcu.c:135
int PCU_And(int c)
Performs a parallel logical AND reduction.
Definition: pcu.c:482
int PCU_Comm_From(int *from_rank)
Similar to PCU_Comm_Sender, returns the rank as an argument.
Definition: pcu.c:672
int PCU_Proc_Peers(void)
Returns the number of processes.
Definition: pcu.c:498
void PCU_Min_Ints(int *p, size_t n)
Performs an Allreduce minimum of int arrays.
Definition: pcu.c:441
int PCU_Comm_Free(void)
Frees all PCU library structures.
Definition: pcu.c:77
bool PCU_Comm_Unpacked(void)
Returns true if the current received buffer has been unpacked.
Definition: pcu.c:197
bool PCU_Comm_Listen(void)
Tries to receive a buffer for this communication phase.
Definition: pcu.c:171
int PCU_Comm_Sender(void)
Returns in * from_rank the sender of the current received buffer.
Definition: pcu.c:184
int PCU_Comm_Init(void)
Initializes the PCU library.
Definition: pcu.c:58
int PCU_Or(int c)
Performs a parallel logical OR reduction.
Definition: pcu.c:475
void PCU_Max_Doubles(double *p, size_t n)
Performs an Allreduce maximum of double arrays.
Definition: pcu.c:290
void PCU_Comm_Begin(void)
Begins a PCU communication phase.
Definition: pcu.c:122
int PCU_Comm_Packed(int to_rank, size_t *size)
Returns in * size the number of bytes being sent to to_rank.
Definition: pcu.c:546
int PCU_Comm_Send(void)
Sends all buffers for this communication phase.
Definition: pcu.c:152
int PCU_Comm_Self(void)
Returns the communication rank of the calling thread.
Definition: pcu.c:98
MPI_Comm PCU_Get_Comm(void)
Return the current MPI communicator.
Definition: pcu.c:736
int PCU_Comm_Size(int *size)
Similar to PCU_Comm_Peers, returns the size as an argument.
Definition: pcu.c:516
void PCU_Barrier(void)
Blocking barrier over all threads.
Definition: pcu.c:243
int PCU_Comm_Received(size_t *size)
Returns in * size the bytes in the current received buffer.
Definition: pcu.c:689
void PCU_Add_Longs(long *p, size_t n)
Performs an Allreduce sum of long integers.
Definition: pcu.c:324
int PCU_Proc_Self(void)
Returns the unique rank of the calling process.
Definition: pcu.c:489
bool PCU_Comm_Read(int *from_rank, void **data, size_t *size)
Receives a message for this communication phase.
Definition: pcu.c:598
void PCU_Add_Doubles(double *p, size_t n)
Performs an Allreduce sum of double arrays.
Definition: pcu.c:256
void PCU_Exscan_Ints(int *p, size_t n)
Performs an exclusive prefix sum of integer arrays.
Definition: pcu.c:392
void PCU_Add_Ints(int *p, size_t n)
Performs an Allreduce sum of integers.
Definition: pcu.c:307