37 #include "pcu_order.h"
38 #include "noto_malloc.h"
40 #include <sys/types.h>
44 enum state { uninit, init };
45 static enum state global_state = uninit;
46 static pcu_msg global_pmsg;
48 static pcu_msg* get_msg()
60 if (global_state != uninit)
61 reel_fail(
"nested calls to Comm_Init");
62 pcu_pmpi_init(MPI_COMM_WORLD);
63 pcu_set_mpi(&pcu_pmpi);
64 pcu_make_msg(&global_pmsg);
79 if (global_state == uninit)
80 reel_fail(
"Comm_Free called before Comm_Init");
81 if (global_pmsg.order)
82 pcu_order_free(global_pmsg.order);
83 pcu_free_msg(&global_pmsg);
85 global_state = uninit;
100 if (global_state == uninit)
101 reel_fail(
"Comm_Self called before Comm_Init");
102 return pcu_mpi_rank();
111 if (global_state == uninit)
112 reel_fail(
"Comm_Peers called before Comm_Init");
113 return pcu_mpi_size();
124 if (global_state == uninit)
125 reel_fail(
"Comm_Begin called before Comm_Init");
126 pcu_msg_start(get_msg());
137 if (global_state == uninit)
138 reel_fail(
"Comm_Pack called before Comm_Init");
139 if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
140 reel_fail(
"Invalid rank in Comm_Pack");
141 memcpy(pcu_msg_pack(get_msg(),to_rank,size),data,size);
154 if (global_state == uninit)
155 reel_fail(
"Comm_Send called before Comm_Init");
156 pcu_msg_send(get_msg());
173 if (global_state == uninit)
174 reel_fail(
"Comm_Listen called before Comm_Init");
175 pcu_msg* m = get_msg();
177 return pcu_order_receive(m->order, m);
178 return pcu_msg_receive(m);
186 if (global_state == uninit)
187 reel_fail(
"Comm_Sender called before Comm_Init");
188 pcu_msg* m = get_msg();
190 return pcu_order_received_from(m->order);
191 return pcu_msg_received_from(m);
199 if (global_state == uninit)
200 reel_fail(
"Comm_Unpacked called before Comm_Init");
201 pcu_msg* m = get_msg();
203 return pcu_order_unpacked(m->order);
204 return pcu_msg_unpacked(m);
219 if (global_state == uninit)
220 reel_fail(
"Comm_Unpack called before Comm_Init");
221 pcu_msg* m = get_msg();
223 memcpy(data,pcu_order_unpack(m->order,size),size);
225 memcpy(data,pcu_msg_unpack(m,size),size);
229 void PCU_Comm_Order(
bool on)
231 if (global_state == uninit)
232 reel_fail(
"Comm_Order called before Comm_Init");
233 pcu_msg* m = get_msg();
234 if (on && (!m->order))
235 m->order = pcu_order_new();
236 if ((!on) && m->order) {
237 pcu_order_free(m->order);
245 if (global_state == uninit)
246 reel_fail(
"Barrier called before Comm_Init");
247 pcu_barrier(&(get_msg()->coll));
258 if (global_state == uninit)
259 reel_fail(
"Add_Doubles called before Comm_Init");
260 pcu_allreduce(&(get_msg()->coll),pcu_add_doubles,p,n*
sizeof(
double));
263 double PCU_Add_Double(
double x)
275 if (global_state == uninit)
276 reel_fail(
"Min_Doubles called before Comm_Init");
277 pcu_allreduce(&(get_msg()->coll),pcu_min_doubles,p,n*
sizeof(
double));
280 double PCU_Min_Double(
double x)
292 if (global_state == uninit)
293 reel_fail(
"Max_Doubles called before Comm_Init");
294 pcu_allreduce(&(get_msg()->coll),pcu_max_doubles,p,n*
sizeof(
double));
297 double PCU_Max_Double(
double x)
309 if (global_state == uninit)
310 reel_fail(
"Add_Ints called before Comm_Init");
311 pcu_allreduce(&(get_msg()->coll),pcu_add_ints,p,n*
sizeof(
int));
314 int PCU_Add_Int(
int x)
326 if (global_state == uninit)
327 reel_fail(
"Add_Longs called before Comm_Init");
328 pcu_allreduce(&(get_msg()->coll),pcu_add_longs,p,n*
sizeof(
long));
331 long PCU_Add_Long(
long x)
343 if (global_state == uninit)
344 reel_fail(
"Add_SizeTs called before Comm_Init");
345 pcu_allreduce(&(get_msg()->coll),pcu_add_sizets,p,n*
sizeof(
size_t));
348 size_t PCU_Add_SizeT(
size_t x)
359 if (global_state == uninit)
360 reel_fail(
"Min_SizeTs called before Comm_Init");
361 pcu_allreduce(&(get_msg()->coll),pcu_min_sizets,p,n*
sizeof(
size_t));
364 size_t PCU_Min_SizeT(
size_t x) {
374 if (global_state == uninit)
375 reel_fail(
"Max_SizeTs called before Comm_Init");
376 pcu_allreduce(&(get_msg()->coll),pcu_max_sizets,p,n*
sizeof(
size_t));
379 size_t PCU_Max_SizeT(
size_t x) {
394 if (global_state == uninit)
395 reel_fail(
"Exscan_Ints called before Comm_Init");
397 NOTO_MALLOC(originals,n);
398 for (
size_t i=0; i < n; ++i)
400 pcu_scan(&(get_msg()->coll),pcu_add_ints,p,n*
sizeof(
int));
402 for (
size_t i=0; i < n; ++i)
403 p[i] -= originals[i];
404 noto_free(originals);
407 int PCU_Exscan_Int(
int x)
418 if (global_state == uninit)
419 reel_fail(
"Exscan_Longs called before Comm_Init");
421 NOTO_MALLOC(originals,n);
422 for (
size_t i=0; i < n; ++i)
424 pcu_scan(&(get_msg()->coll),pcu_add_longs,p,n*
sizeof(
long));
426 for (
size_t i=0; i < n; ++i)
427 p[i] -= originals[i];
428 noto_free(originals);
431 long PCU_Exscan_Long(
long x)
443 if (global_state == uninit)
444 reel_fail(
"Min_Ints called before Comm_Init");
445 pcu_allreduce(&(get_msg()->coll),pcu_min_ints,p,n*
sizeof(
int));
448 int PCU_Min_Int(
int x)
460 if (global_state == uninit)
461 reel_fail(
"Max_Ints called before Comm_Init");
462 pcu_allreduce(&(get_msg()->coll),pcu_max_ints,p,n*
sizeof(
int));
465 int PCU_Max_Int(
int x)
477 return PCU_Max_Int(c);
484 return PCU_Min_Int(c);
491 if (global_state == uninit)
492 reel_fail(
"Proc_Self called before Comm_Init");
493 return pcu_pmpi_rank();
500 if (global_state == uninit)
501 reel_fail(
"Proc_Peers called before Comm_Init");
502 return pcu_pmpi_size();
509 if (global_state == uninit)
510 reel_fail(
"Comm_Rank called before Comm_Init");
511 *rank = pcu_mpi_rank();
518 if (global_state == uninit)
519 reel_fail(
"Comm_Size called before Comm_Init");
520 *size = pcu_mpi_size();
527 return global_state == init;
535 if (global_state == uninit)
536 reel_fail(
"Comm_Start called before Comm_Init");
537 pcu_msg_start(get_msg());
548 if (global_state == uninit)
549 reel_fail(
"Comm_Packed called before Comm_Init");
550 if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
551 reel_fail(
"Invalid rank in Comm_Packed");
552 *size = pcu_msg_packed(get_msg(),to_rank);
567 if (global_state == uninit)
568 reel_fail(
"Comm_Write called before Comm_Init");
569 if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
570 reel_fail(
"Invalid rank in Comm_Write");
571 pcu_msg* msg = get_msg();
572 PCU_MSG_PACK(msg,to_rank,size);
573 memcpy(pcu_msg_pack(msg,to_rank,size),data,size);
600 if (global_state == uninit)
601 reel_fail(
"Comm_Read called before Comm_Init");
605 PCU_COMM_UNPACK(*size);
610 static void safe_mkdir(
const char* path, mode_t mode)
614 err = mkdir(path, mode);
615 if (err != 0 && errno != EEXIST)
616 reel_fail(
"PCU: could not create directory \"%s\"\n", path);
619 static void append(
char* s,
size_t size,
const char* format, ...)
623 va_start(ap, format);
624 vsnprintf(s + len, size - len, format, ap);
629 void PCU_Debug_Open(
void)
631 if (global_state == uninit)
632 reel_fail(
"Debug_Open called before Comm_Init");
634 const int fanout = 2048;
635 const int bufsize = 1024;
636 char* path = noto_malloc(bufsize);
639 mode_t
const dir_perm = S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH;
640 strcpy(path,
"debug/");
641 safe_mkdir(path, dir_perm);
643 append(path, bufsize,
"%d/",
self / fanout);
644 if (
self % fanout == 0)
645 safe_mkdir(path, dir_perm);
649 append(path,bufsize,
"%s",
"debug");
650 pcu_msg* msg = get_msg();
652 msg->file = pcu_open_parallel(path,
"txt");
659 if (global_state == uninit)
660 reel_fail(
"Debug_Print called before Comm_Init");
661 pcu_msg* msg = get_msg();
666 vfprintf(msg->file,format,ap);
674 if (global_state == uninit)
675 reel_fail(
"Comm_From called before Comm_Init");
676 pcu_msg* m = get_msg();
678 *from_rank = pcu_order_received_from(m->order);
680 *from_rank = pcu_msg_received_from(m);
691 if (global_state == uninit)
692 reel_fail(
"Comm_Received called before Comm_Init");
693 pcu_msg* m = get_msg();
695 *size = pcu_order_received_size(m->order);
697 *size = pcu_msg_received_size(m);
709 if (global_state == uninit)
710 reel_fail(
"Comm_Extract called before Comm_Init");
711 pcu_msg* m = get_msg();
713 return pcu_order_unpack(m->order,size);
714 return pcu_msg_unpack(m,size);
726 if (global_state == uninit)
727 reel_fail(
"Switch_Comm called before Comm_Init");
728 pcu_pmpi_switch(new_comm);
738 if (global_state == uninit)
739 reel_fail(
"Get_Comm called before Comm_Init");
740 return pcu_pmpi_comm();
750 void PCU_Protect(
void)
bool PCU_Comm_Receive(void)
Convenience wrapper over Listen and Unpacked.
void PCU_Add_SizeTs(size_t *p, size_t n)
Performs an Allreduce sum of size_t unsigned integers.
int PCU_Comm_Write(int to_rank, const void *data, size_t size)
Packs a message to be sent to to_rank.
int PCU_Comm_Rank(int *rank)
Similar to PCU_Comm_Self, returns the rank as an argument.
bool PCU_Comm_Initialized(void)
Returns true iff PCU has been initialized.
void PCU_Exscan_Longs(long *p, size_t n)
See PCU_Exscan_Ints.
void PCU_Max_SizeTs(size_t *p, size_t n)
Performs an Allreduce maximum of size_t unsigned integers.
void PCU_Max_Ints(int *p, size_t n)
Performs an Allreduce maximum of int arrays.
void * PCU_Comm_Extract(size_t size)
Extracts a block of data from the current received buffer.
int PCU_Comm_Peers(void)
Returns the number of threads in the program.
void PCU_Debug_Print(const char *format,...)
like fprintf, contents go to debugN.txt
void PCU_Min_SizeTs(size_t *p, size_t n)
Performs an Allreduce minimum of size_t unsigned integers.
void PCU_Min_Doubles(double *p, size_t n)
Performs an Allreduce minimum of double arrays.
double PCU_Time(void)
Return the time in seconds since some time in the past.
void PCU_Switch_Comm(MPI_Comm new_comm)
Reinitializes PCU with a new MPI communicator.
int PCU_Comm_Unpack(void *data, size_t size)
Unpacks a block of data from the current received buffer.
int PCU_Comm_Start(PCU_Method method)
Deprecated, see PCU_Comm_Begin.
int PCU_Comm_Pack(int to_rank, const void *data, size_t size)
Packs data to be sent to to_rank.
int PCU_And(int c)
Performs a parallel logical AND reduction.
int PCU_Comm_From(int *from_rank)
Similar to PCU_Comm_Sender, returns the rank as an argument.
int PCU_Proc_Peers(void)
Returns the number of processes.
void PCU_Min_Ints(int *p, size_t n)
Performs an Allreduce minimum of int arrays.
int PCU_Comm_Free(void)
Frees all PCU library structures.
bool PCU_Comm_Unpacked(void)
Returns true if the current received buffer has been unpacked.
bool PCU_Comm_Listen(void)
Tries to receive a buffer for this communication phase.
int PCU_Comm_Sender(void)
Returns in * from_rank the sender of the current received buffer.
int PCU_Comm_Init(void)
Initializes the PCU library.
int PCU_Or(int c)
Performs a parallel logical OR reduction.
void PCU_Max_Doubles(double *p, size_t n)
Performs an Allreduce maximum of double arrays.
void PCU_Comm_Begin(void)
Begins a PCU communication phase.
int PCU_Comm_Packed(int to_rank, size_t *size)
Returns in * size the number of bytes being sent to to_rank.
int PCU_Comm_Send(void)
Sends all buffers for this communication phase.
int PCU_Comm_Self(void)
Returns the communication rank of the calling thread.
MPI_Comm PCU_Get_Comm(void)
Return the current MPI communicator.
int PCU_Comm_Size(int *size)
Similar to PCU_Comm_Peers, returns the size as an argument.
void PCU_Barrier(void)
Blocking barrier over all threads.
int PCU_Comm_Received(size_t *size)
Returns in * size the bytes in the current received buffer.
void PCU_Add_Longs(long *p, size_t n)
Performs an Allreduce sum of long integers.
int PCU_Proc_Self(void)
Returns the unique rank of the calling process.
bool PCU_Comm_Read(int *from_rank, void **data, size_t *size)
Receives a message for this communication phase.
void PCU_Add_Doubles(double *p, size_t n)
Performs an Allreduce sum of double arrays.
void PCU_Exscan_Ints(int *p, size_t n)
Performs an exclusive prefix sum of integer arrays.
void PCU_Add_Ints(int *p, size_t n)
Performs an Allreduce sum of integers.